git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@605698 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-12-19 21:09:40 +00:00
parent 8228663f6d
commit de14440109
4 changed files with 104 additions and 39 deletions

View File

@ -25,17 +25,7 @@ import java.util.TimerTask;
*/ */
public final class Scheduler { public final class Scheduler {
private static final class SchedulerTimerTask extends TimerTask {
private final Runnable task;
private SchedulerTimerTask(Runnable task) {
this.task = task;
}
public void run() {
task.run();
}
}
public static final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true); public static final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true);
private static final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>(); private static final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>();

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.thread;
import java.util.TimerTask;
/**
* A TimeTask for a Runnable object
*
*/
public class SchedulerTimerTask extends TimerTask {
private final Runnable task;
public SchedulerTimerTask(Runnable task) {
this.task = task;
}
public void run() {
this.task.run();
}
}

View File

@ -17,11 +17,17 @@
package org.apache.activemq.transport; package org.apache.activemq.transport;
import java.io.IOException; import java.io.IOException;
import java.util.Timer;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -34,7 +40,9 @@ import org.apache.commons.logging.LogFactory;
public class InactivityMonitor extends TransportFilter { public class InactivityMonitor extends TransportFilter {
private static final Log LOG = LogFactory.getLog(InactivityMonitor.class); private static final Log LOG = LogFactory.getLog(InactivityMonitor.class);
private static final ThreadPoolExecutor ASYNC_TASKS;
private static final Timer READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck");
private static final Timer WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck");
private WireFormatInfo localWireFormatInfo; private WireFormatInfo localWireFormatInfo;
private WireFormatInfo remoteWireFormatInfo; private WireFormatInfo remoteWireFormatInfo;
private final AtomicBoolean monitorStarted = new AtomicBoolean(false); private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
@ -44,6 +52,8 @@ public class InactivityMonitor extends TransportFilter {
private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false); private final AtomicBoolean inReceive = new AtomicBoolean(false);
private SchedulerTimerTask writeCheckerTask;
private SchedulerTimerTask readCheckerTask;
private final Runnable readChecker = new Runnable() { private final Runnable readChecker = new Runnable() {
long lastRunTime; long lastRunTime;
@ -51,6 +61,7 @@ public class InactivityMonitor extends TransportFilter {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if( lastRunTime != 0 && LOG.isDebugEnabled() ) { if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
LOG.debug(""+(now-lastRunTime)+" ms elapsed since last read check."); LOG.debug(""+(now-lastRunTime)+" ms elapsed since last read check.");
} }
lastRunTime = now; lastRunTime = now;
readCheck(); readCheck();
@ -62,7 +73,8 @@ public class InactivityMonitor extends TransportFilter {
public void run() { public void run() {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if( lastRunTime != 0 && LOG.isDebugEnabled() ) { if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
LOG.debug(""+(now-lastRunTime)+" ms elapsed since last read check."); LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check.");
} }
lastRunTime = now; lastRunTime = now;
writeCheck(); writeCheck();
@ -80,14 +92,17 @@ public class InactivityMonitor extends TransportFilter {
final void writeCheck() { final void writeCheck() {
if (inSend.get()) { if (inSend.get()) {
LOG.trace("A send is in progress"); if (LOG.isTraceEnabled()) {
LOG.trace("A send is in progress");
}
return; return;
} }
if (!commandSent.get()) { if (!commandSent.get()) {
LOG.trace("No message sent since last write check, sending a KeepAliveInfo"); if(LOG.isTraceEnabled()) {
// TODO: use a thread pool for this.. LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
Thread thread = new Thread("ActiveMQ: Activity Generator: "+next.getRemoteAddress()) { }
ASYNC_TASKS.execute(new Runnable() {
public void run() { public void run() {
try { try {
oneway(new KeepAliveInfo()); oneway(new KeepAliveInfo());
@ -95,11 +110,11 @@ public class InactivityMonitor extends TransportFilter {
onException(e); onException(e);
} }
}; };
}; });
thread.setDaemon(true);
thread.start();
} else { } else {
LOG.trace("Message sent since last write check, resetting flag"); if (LOG.isTraceEnabled()) {
LOG.trace("Message sent since last write check, resetting flag");
}
} }
commandSent.set(false); commandSent.set(false);
@ -107,29 +122,34 @@ public class InactivityMonitor extends TransportFilter {
final void readCheck() { final void readCheck() {
if (inReceive.get()) { if (inReceive.get()) {
LOG.trace("A receive is in progress"); if (LOG.isTraceEnabled()) {
LOG.trace("A receive is in progress");
}
return; return;
} }
if (!commandReceived.get()) { if (!commandReceived.get()) {
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
}
// TODO: use a thread pool for this.. // TODO: use a thread pool for this..
Thread thread = new Thread("ActiveMQ: Inactivity Handler: "+next.getRemoteAddress()) { ASYNC_TASKS.execute(new Runnable() {
public void run() { public void run() {
onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
}; };
}; });
thread.setDaemon(true);
thread.start();
} else { } else {
LOG.trace("Message received since last read check, resetting flag: "); if (LOG.isTraceEnabled()) {
LOG.trace("Message received since last read check, resetting flag: ");
}
} }
commandReceived.set(false); commandReceived.set(false);
} }
public void onCommand(Object command) { public void onCommand(Object command) {
commandReceived.set(true);
inReceive.set(true); inReceive.set(true);
try { try {
if (command.getClass() == WireFormatInfo.class) { if (command.getClass() == WireFormatInfo.class) {
@ -150,7 +170,7 @@ public class InactivityMonitor extends TransportFilter {
transportListener.onCommand(command); transportListener.onCommand(command);
} }
} finally { } finally {
commandReceived.set(true);
inReceive.set(false); inReceive.set(false);
} }
} }
@ -192,11 +212,14 @@ public class InactivityMonitor extends TransportFilter {
return; return;
} }
long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); long checkTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
if (l > 0) { if (checkTime > 0) {
monitorStarted.set(true); monitorStarted.set(true);
Scheduler.executePeriodically(writeChecker, l / 2); writeCheckerTask = new SchedulerTimerTask(writeChecker);
Scheduler.executePeriodically(readChecker, l); readCheckerTask = new SchedulerTimerTask(readChecker);
long writeCheckTime = checkTime/3;
WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, writeCheckTime,writeCheckTime);
READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, checkTime,checkTime);
} }
} }
@ -205,9 +228,22 @@ public class InactivityMonitor extends TransportFilter {
*/ */
private synchronized void stopMonitorThreads() { private synchronized void stopMonitorThreads() {
if (monitorStarted.compareAndSet(true, false)) { if (monitorStarted.compareAndSet(true, false)) {
Scheduler.cancel(readChecker); readCheckerTask.cancel();
Scheduler.cancel(writeChecker); writeCheckerTask.cancel();
WRITE_CHECK_TIMER.purge();
READ_CHECK_TIMER.purge();
} }
} }
static {
ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
thread.setDaemon(true);
return thread;
}
});
}
} }

View File

@ -47,7 +47,7 @@ public class SimpleTopicTest extends TestCase {
protected int samepleCount = 20; protected int samepleCount = 20;
protected long sampleInternal = 10000; protected long sampleInternal = 10000;
protected int numberOfConsumers = 1; protected int numberOfConsumers = 1;
protected int numberofProducers = 2; protected int numberofProducers = 0;
protected int playloadSize = 1024; protected int playloadSize = 1024;
protected byte[] array; protected byte[] array;
protected ConnectionFactory factory; protected ConnectionFactory factory;
@ -164,8 +164,12 @@ public class SimpleTopicTest extends TestCase {
totalRate += rate.getRate(); totalRate += rate.getRate();
totalCount += rate.getTotalCount(); totalCount += rate.getTotalCount();
} }
int avgRate = totalRate / producers.length; if (producers != null && producers.length > 0) {
System.out.println("Avg producer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", sent = " + totalCount); int avgRate = totalRate / producers.length;
System.out.println("Avg producer rate = " + avgRate
+ " msg/sec | Total rate = " + totalRate + ", sent = "
+ totalCount);
}
} }
protected void dumpConsumerRate() { protected void dumpConsumerRate() {