fix for: https://issues.apache.org/jira/browse/AMQ-3714 lazy init the Scheduler object in ActiveMQConnection.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1244796 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-02-16 00:02:15 +00:00
parent 98b7dcdd1d
commit 97ae32388c
3 changed files with 41 additions and 21 deletions

View File

@ -33,6 +33,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionConsumer; import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData; import javax.jms.ConnectionMetaData;
@ -51,6 +52,7 @@ import javax.jms.Topic;
import javax.jms.TopicConnection; import javax.jms.TopicConnection;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import javax.jms.XAConnection; import javax.jms.XAConnection;
import org.apache.activemq.advisory.DestinationSource; import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -190,7 +192,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean useDedicatedTaskRunner; private boolean useDedicatedTaskRunner;
protected volatile CountDownLatch transportInterruptionProcessingComplete; protected volatile CountDownLatch transportInterruptionProcessingComplete;
private long consumerFailoverRedeliveryWaitPeriod; private long consumerFailoverRedeliveryWaitPeriod;
private final Scheduler scheduler; private Scheduler scheduler;
private boolean messagePrioritySupported = true; private boolean messagePrioritySupported = true;
private boolean transactedIndividualAck = false; private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false; private boolean nonBlockingRedelivery = false;
@ -230,8 +232,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.factoryStats.addConnection(this); this.factoryStats.addConnection(this);
this.timeCreated = System.currentTimeMillis(); this.timeCreated = System.currentTimeMillis();
this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler");
this.scheduler.start();
} }
protected void setUserName(String userName) { protected void setUserName(String userName) {
@ -622,9 +622,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
advisoryConsumer.dispose(); advisoryConsumer.dispose();
advisoryConsumer = null; advisoryConsumer = null;
} }
if (this.scheduler != null) {
Scheduler scheduler = this.scheduler;
if (scheduler != null) {
try { try {
this.scheduler.stop(); scheduler.stop();
} catch (Exception e) { } catch (Exception e) {
JMSException ex = JMSExceptionSupport.create(e); JMSException ex = JMSExceptionSupport.create(e);
throw ex; throw ex;
@ -2408,8 +2410,23 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
return consumerFailoverRedeliveryWaitPeriod; return consumerFailoverRedeliveryWaitPeriod;
} }
protected Scheduler getScheduler() { protected Scheduler getScheduler() throws JMSException {
return this.scheduler; Scheduler result = scheduler;
if (result == null) {
synchronized (this) {
result = scheduler;
if (result == null) {
checkClosed();
try {
result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
scheduler.start();
} catch(Exception e) {
throw JMSExceptionSupport.create(e);
}
}
}
}
return result;
} }
protected ThreadPoolExecutor getExecutor() { protected ThreadPoolExecutor getExecutor() {

View File

@ -29,6 +29,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException; import javax.jms.InvalidDestinationException;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -36,6 +37,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.TransactionRolledBackException; import javax.jms.TransactionRolledBackException;
import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -54,7 +56,6 @@ import org.apache.activemq.management.JMSConsumerStatsImpl;
import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback; import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
@ -109,7 +110,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class); private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
protected final Scheduler scheduler;
protected final ActiveMQSession session; protected final ActiveMQSession session;
protected final ConsumerInfo info; protected final ConsumerInfo info;
@ -207,7 +207,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
this.session = session; this.session = session;
this.scheduler = session.getScheduler();
this.redeliveryPolicy = session.connection.getRedeliveryPolicy(); this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
setTransformer(session.getTransformer()); setTransformer(session.getTransformer());
@ -1192,7 +1191,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
new LinkedList<MessageDispatch>(deliveredMessages); new LinkedList<MessageDispatch>(deliveredMessages);
// Start up the delivery again a little later. // Start up the delivery again a little later.
scheduler.executeAfterDelay(new Runnable() { session.getScheduler().executeAfterDelay(new Runnable() {
public void run() { public void run() {
try { try {
if (!unconsumedMessages.isClosed()) { if (!unconsumedMessages.isClosed()) {
@ -1216,7 +1215,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) { if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
// Start up the delivery again a little later. // Start up the delivery again a little later.
scheduler.executeAfterDelay(new Runnable() { session.getScheduler().executeAfterDelay(new Runnable() {
public void run() { public void run() {
try { try {
if (started.get()) { if (started.get()) {

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
@ -53,6 +54,7 @@ import javax.jms.TopicPublisher;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException; import javax.jms.TransactionRolledBackException;
import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.blob.BlobUploader; import org.apache.activemq.blob.BlobUploader;
@ -198,7 +200,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
} }
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
private final Scheduler scheduler;
private final ThreadPoolExecutor connectionExecutor; private final ThreadPoolExecutor connectionExecutor;
protected int acknowledgementMode; protected int acknowledgementMode;
@ -251,7 +252,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
this.connection.asyncSendPacket(info); this.connection.asyncSendPacket(info);
setTransformer(connection.getTransformer()); setTransformer(connection.getTransformer());
setBlobTransferPolicy(connection.getBlobTransferPolicy()); setBlobTransferPolicy(connection.getBlobTransferPolicy());
this.scheduler=connection.getScheduler();
this.connectionExecutor=connection.getExecutor(); this.connectionExecutor=connection.getExecutor();
this.executor = new ActiveMQSessionExecutor(this); this.executor = new ActiveMQSessionExecutor(this);
connection.addSession(this); connection.addSession(this);
@ -659,10 +659,14 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
// //
for (final ActiveMQMessageConsumer consumer : consumers) { for (final ActiveMQMessageConsumer consumer : consumers) {
consumer.inProgressClearRequired(); consumer.inProgressClearRequired();
scheduler.executeAfterDelay(new Runnable() { try {
public void run() { connection.getScheduler().executeAfterDelay(new Runnable() {
consumer.clearMessagesInProgress(); public void run() {
}}, 0l); consumer.clearMessagesInProgress();
}}, 0l);
} catch (JMSException e) {
connection.onClientInternalException(e);
}
} }
} }
@ -892,7 +896,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
for (int i = 0; i < redeliveryCounter; i++) { for (int i = 0; i < redeliveryCounter; i++) {
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
} }
scheduler.executeAfterDelay(new Runnable() { connection.getScheduler().executeAfterDelay(new Runnable() {
public void run() { public void run() {
((ActiveMQDispatcher)md.getConsumer()).dispatch(md); ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
@ -2051,8 +2055,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
} }
} }
protected Scheduler getScheduler() { protected Scheduler getScheduler() throws JMSException {
return this.scheduler; return this.connection.getScheduler();
} }
protected ThreadPoolExecutor getConnectionExecutor() { protected ThreadPoolExecutor getConnectionExecutor() {