diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java b/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java index d17b12d45b..29ed0778c0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java @@ -25,17 +25,7 @@ import java.util.TimerTask; */ public final class Scheduler { - private static final class SchedulerTimerTask extends TimerTask { - private final Runnable task; - - private SchedulerTimerTask(Runnable task) { - this.task = task; - } - - public void run() { - task.run(); - } - } + public static final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true); private static final HashMap TIMER_TASKS = new HashMap(); diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java b/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java new file mode 100644 index 0000000000..95cbdb1834 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.thread; + +import java.util.TimerTask; + +/** + * A TimeTask for a Runnable object + * + */ +public class SchedulerTimerTask extends TimerTask { + private final Runnable task; + + public SchedulerTimerTask(Runnable task) { + this.task = task; + } + + public void run() { + this.task.run(); + } +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 974c6b324c..8256c19edd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -17,11 +17,17 @@ package org.apache.activemq.transport; import java.io.IOException; +import java.util.Timer; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.thread.Scheduler; +import org.apache.activemq.thread.SchedulerTimerTask; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,7 +40,9 @@ import org.apache.commons.logging.LogFactory; public class InactivityMonitor extends TransportFilter { private static final Log LOG = LogFactory.getLog(InactivityMonitor.class); - + private static final ThreadPoolExecutor ASYNC_TASKS; + private static final Timer READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck"); + private static final Timer WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck"); private WireFormatInfo localWireFormatInfo; private WireFormatInfo remoteWireFormatInfo; private final AtomicBoolean monitorStarted = new AtomicBoolean(false); @@ -44,6 +52,8 @@ public class InactivityMonitor extends TransportFilter { private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean inReceive = new AtomicBoolean(false); + private SchedulerTimerTask writeCheckerTask; + private SchedulerTimerTask readCheckerTask; private final Runnable readChecker = new Runnable() { long lastRunTime; @@ -51,6 +61,7 @@ public class InactivityMonitor extends TransportFilter { long now = System.currentTimeMillis(); if( lastRunTime != 0 && LOG.isDebugEnabled() ) { LOG.debug(""+(now-lastRunTime)+" ms elapsed since last read check."); + } lastRunTime = now; readCheck(); @@ -62,7 +73,8 @@ public class InactivityMonitor extends TransportFilter { public void run() { long now = System.currentTimeMillis(); if( lastRunTime != 0 && LOG.isDebugEnabled() ) { - LOG.debug(""+(now-lastRunTime)+" ms elapsed since last read check."); + LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check."); + } lastRunTime = now; writeCheck(); @@ -80,14 +92,17 @@ public class InactivityMonitor extends TransportFilter { final void writeCheck() { if (inSend.get()) { - LOG.trace("A send is in progress"); + if (LOG.isTraceEnabled()) { + LOG.trace("A send is in progress"); + } return; } if (!commandSent.get()) { - LOG.trace("No message sent since last write check, sending a KeepAliveInfo"); - // TODO: use a thread pool for this.. - Thread thread = new Thread("ActiveMQ: Activity Generator: "+next.getRemoteAddress()) { + if(LOG.isTraceEnabled()) { + LOG.trace("No message sent since last write check, sending a KeepAliveInfo"); + } + ASYNC_TASKS.execute(new Runnable() { public void run() { try { oneway(new KeepAliveInfo()); @@ -95,11 +110,11 @@ public class InactivityMonitor extends TransportFilter { onException(e); } }; - }; - thread.setDaemon(true); - thread.start(); + }); } else { - LOG.trace("Message sent since last write check, resetting flag"); + if (LOG.isTraceEnabled()) { + LOG.trace("Message sent since last write check, resetting flag"); + } } commandSent.set(false); @@ -107,29 +122,34 @@ public class InactivityMonitor extends TransportFilter { final void readCheck() { if (inReceive.get()) { - LOG.trace("A receive is in progress"); + if (LOG.isTraceEnabled()) { + LOG.trace("A receive is in progress"); + } return; } - if (!commandReceived.get()) { - LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); + if (LOG.isDebugEnabled()) { + LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); + } + // TODO: use a thread pool for this.. - Thread thread = new Thread("ActiveMQ: Inactivity Handler: "+next.getRemoteAddress()) { + ASYNC_TASKS.execute(new Runnable() { public void run() { onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); }; - }; - thread.setDaemon(true); - thread.start(); + }); } else { - LOG.trace("Message received since last read check, resetting flag: "); + if (LOG.isTraceEnabled()) { + LOG.trace("Message received since last read check, resetting flag: "); + } } commandReceived.set(false); } public void onCommand(Object command) { + commandReceived.set(true); inReceive.set(true); try { if (command.getClass() == WireFormatInfo.class) { @@ -150,7 +170,7 @@ public class InactivityMonitor extends TransportFilter { transportListener.onCommand(command); } } finally { - commandReceived.set(true); + inReceive.set(false); } } @@ -192,11 +212,14 @@ public class InactivityMonitor extends TransportFilter { return; } - long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); - if (l > 0) { + long checkTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); + if (checkTime > 0) { monitorStarted.set(true); - Scheduler.executePeriodically(writeChecker, l / 2); - Scheduler.executePeriodically(readChecker, l); + writeCheckerTask = new SchedulerTimerTask(writeChecker); + readCheckerTask = new SchedulerTimerTask(readChecker); + long writeCheckTime = checkTime/3; + WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, writeCheckTime,writeCheckTime); + READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, checkTime,checkTime); } } @@ -205,9 +228,22 @@ public class InactivityMonitor extends TransportFilter { */ private synchronized void stopMonitorThreads() { if (monitorStarted.compareAndSet(true, false)) { - Scheduler.cancel(readChecker); - Scheduler.cancel(writeChecker); + readCheckerTask.cancel(); + writeCheckerTask.cancel(); + WRITE_CHECK_TIMER.purge(); + READ_CHECK_TIMER.purge(); } } + + + static { + ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable); + thread.setDaemon(true); + return thread; + } + }); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java index 0377d7c986..ec1470e714 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java @@ -47,7 +47,7 @@ public class SimpleTopicTest extends TestCase { protected int samepleCount = 20; protected long sampleInternal = 10000; protected int numberOfConsumers = 1; - protected int numberofProducers = 2; + protected int numberofProducers = 0; protected int playloadSize = 1024; protected byte[] array; protected ConnectionFactory factory; @@ -164,8 +164,12 @@ public class SimpleTopicTest extends TestCase { totalRate += rate.getRate(); totalCount += rate.getTotalCount(); } - int avgRate = totalRate / producers.length; - System.out.println("Avg producer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", sent = " + totalCount); + if (producers != null && producers.length > 0) { + int avgRate = totalRate / producers.length; + System.out.println("Avg producer rate = " + avgRate + + " msg/sec | Total rate = " + totalRate + ", sent = " + + totalCount); + } } protected void dumpConsumerRate() {