From 97ae32388c4fdb4a48ae0892e8a9142da062d032 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Thu, 16 Feb 2012 00:02:15 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-3714 lazy init the Scheduler object in ActiveMQConnection. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1244796 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 31 ++++++++++++++----- .../activemq/ActiveMQMessageConsumer.java | 9 +++--- .../org/apache/activemq/ActiveMQSession.java | 22 +++++++------ 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 8727e7b95f..ca7b495fb0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -33,6 +33,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + import javax.jms.Connection; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; @@ -51,6 +52,7 @@ import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.XAConnection; + import org.apache.activemq.advisory.DestinationSource; import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.command.ActiveMQDestination; @@ -190,7 +192,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean useDedicatedTaskRunner; protected volatile CountDownLatch transportInterruptionProcessingComplete; private long consumerFailoverRedeliveryWaitPeriod; - private final Scheduler scheduler; + private Scheduler scheduler; private boolean messagePrioritySupported = true; private boolean transactedIndividualAck = false; private boolean nonBlockingRedelivery = false; @@ -230,8 +232,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.factoryStats.addConnection(this); this.timeCreated = System.currentTimeMillis(); this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); - this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler"); - this.scheduler.start(); } protected void setUserName(String userName) { @@ -622,9 +622,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon advisoryConsumer.dispose(); advisoryConsumer = null; } - if (this.scheduler != null) { + + Scheduler scheduler = this.scheduler; + if (scheduler != null) { try { - this.scheduler.stop(); + scheduler.stop(); } catch (Exception e) { JMSException ex = JMSExceptionSupport.create(e); throw ex; @@ -2408,8 +2410,23 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon return consumerFailoverRedeliveryWaitPeriod; } - protected Scheduler getScheduler() { - return this.scheduler; + protected Scheduler getScheduler() throws JMSException { + Scheduler result = scheduler; + if (result == null) { + synchronized (this) { + result = scheduler; + if (result == null) { + checkClosed(); + try { + result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler"); + scheduler.start(); + } catch(Exception e) { + throw JMSExceptionSupport.create(e); + } + } + } + } + return result; } protected ThreadPoolExecutor getExecutor() { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index a695ea1b87..d712f5f9ae 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; + import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; @@ -36,6 +37,7 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.TransactionRolledBackException; + import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.activemq.command.ActiveMQDestination; @@ -54,7 +56,6 @@ import org.apache.activemq.management.JMSConsumerStatsImpl; import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; import org.apache.activemq.selector.SelectorParser; -import org.apache.activemq.thread.Scheduler; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.util.Callback; import org.apache.activemq.util.IntrospectionSupport; @@ -109,7 +110,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class); - protected final Scheduler scheduler; protected final ActiveMQSession session; protected final ConsumerInfo info; @@ -207,7 +207,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } this.session = session; - this.scheduler = session.getScheduler(); this.redeliveryPolicy = session.connection.getRedeliveryPolicy(); setTransformer(session.getTransformer()); @@ -1192,7 +1191,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC new LinkedList(deliveredMessages); // Start up the delivery again a little later. - scheduler.executeAfterDelay(new Runnable() { + session.getScheduler().executeAfterDelay(new Runnable() { public void run() { try { if (!unconsumedMessages.isClosed()) { @@ -1216,7 +1215,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) { // Start up the delivery again a little later. - scheduler.executeAfterDelay(new Runnable() { + session.getScheduler().executeAfterDelay(new Runnable() { public void run() { try { if (started.get()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 5dc98e24cf..1d9966b33e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; + import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.IllegalStateException; @@ -53,6 +54,7 @@ import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.jms.TransactionRolledBackException; + import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobUploader; @@ -198,7 +200,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); - private final Scheduler scheduler; private final ThreadPoolExecutor connectionExecutor; protected int acknowledgementMode; @@ -251,7 +252,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta this.connection.asyncSendPacket(info); setTransformer(connection.getTransformer()); setBlobTransferPolicy(connection.getBlobTransferPolicy()); - this.scheduler=connection.getScheduler(); this.connectionExecutor=connection.getExecutor(); this.executor = new ActiveMQSessionExecutor(this); connection.addSession(this); @@ -659,10 +659,14 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta // for (final ActiveMQMessageConsumer consumer : consumers) { consumer.inProgressClearRequired(); - scheduler.executeAfterDelay(new Runnable() { - public void run() { - consumer.clearMessagesInProgress(); - }}, 0l); + try { + connection.getScheduler().executeAfterDelay(new Runnable() { + public void run() { + consumer.clearMessagesInProgress(); + }}, 0l); + } catch (JMSException e) { + connection.onClientInternalException(e); + } } } @@ -892,7 +896,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta for (int i = 0; i < redeliveryCounter; i++) { redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); } - scheduler.executeAfterDelay(new Runnable() { + connection.getScheduler().executeAfterDelay(new Runnable() { public void run() { ((ActiveMQDispatcher)md.getConsumer()).dispatch(md); @@ -2051,8 +2055,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } } - protected Scheduler getScheduler() { - return this.scheduler; + protected Scheduler getScheduler() throws JMSException { + return this.connection.getScheduler(); } protected ThreadPoolExecutor getConnectionExecutor() {