git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1157238 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-08-12 20:29:29 +00:00
parent 9026274532
commit 0885c60c4d
34 changed files with 2079 additions and 947 deletions

View File

@ -374,6 +374,23 @@
</reporting>
<build>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
<includes>
<include>**/*</include>
</includes>
</resource>
<resource>
<directory>${project.basedir}/src/main/filtered-resources</directory>
<filtering>true</filtering>
<includes>
<include>**/*</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>

View File

@ -0,0 +1 @@
${project.version}

View File

@ -0,0 +1,372 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport;
import java.io.IOException;
import java.util.Timer;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Used to make sure that commands are arriving periodically from the peer of
* the transport.
*/
public abstract class AbstractInactivityMonitor extends TransportFilter {
private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class);
private static ThreadPoolExecutor ASYNC_TASKS;
private static int CHECKER_COUNTER;
private static long DEFAULT_CHECK_TIME_MILLS = 30000;
private static Timer READ_CHECK_TIMER;
private static Timer WRITE_CHECK_TIMER;
private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
private final AtomicBoolean commandSent = new AtomicBoolean(false);
private final AtomicBoolean inSend = new AtomicBoolean(false);
private final AtomicBoolean failed = new AtomicBoolean(false);
private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false);
private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
private SchedulerTimerTask writeCheckerTask;
private SchedulerTimerTask readCheckerTask;
private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
private boolean useKeepAlive = true;
private boolean keepAliveResponseRequired;
protected WireFormat wireFormat;
private final Runnable readChecker = new Runnable() {
long lastRunTime;
public void run() {
long now = System.currentTimeMillis();
long elapsed = (now-lastRunTime);
if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
LOG.debug(""+elapsed+" ms elapsed since last read check.");
}
// Perhaps the timer executed a read check late.. and then executes
// the next read check on time which causes the time elapsed between
// read checks to be small..
// If less than 90% of the read check Time elapsed then abort this readcheck.
if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
return;
}
lastRunTime = now;
readCheck();
}
};
private boolean allowReadCheck(long elapsed) {
return elapsed > (readCheckTime * 9 / 10);
}
private final Runnable writeChecker = new Runnable() {
long lastRunTime;
public void run() {
long now = System.currentTimeMillis();
if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
}
lastRunTime = now;
writeCheck();
}
};
public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) {
super(next);
this.wireFormat = wireFormat;
}
public void start() throws Exception {
next.start();
startMonitorThreads();
}
public void stop() throws Exception {
stopMonitorThreads();
next.stop();
}
final void writeCheck() {
if (inSend.get()) {
if (LOG.isTraceEnabled()) {
LOG.trace("A send is in progress");
}
return;
}
if (!commandSent.get() && useKeepAlive) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
if (monitorStarted.get()) {
try {
KeepAliveInfo info = new KeepAliveInfo();
info.setResponseRequired(keepAliveResponseRequired);
oneway(info);
} catch (IOException e) {
onException(e);
}
}
};
});
} else {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " message sent since last write check, resetting flag");
}
}
commandSent.set(false);
}
final void readCheck() {
int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
if (inReceive.get() || currentCounter!=previousCounter ) {
if (LOG.isTraceEnabled()) {
LOG.trace("A receive is in progress");
}
return;
}
if (!commandReceived.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
};
});
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Message received since last read check, resetting flag: ");
}
}
commandReceived.set(false);
}
protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException;
protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException;
public void onCommand(Object command) {
commandReceived.set(true);
inReceive.set(true);
try {
if (command.getClass() == KeepAliveInfo.class) {
KeepAliveInfo info = (KeepAliveInfo) command;
if (info.isResponseRequired()) {
try {
info.setResponseRequired(false);
oneway(info);
} catch (IOException e) {
onException(e);
}
}
} else {
if (command.getClass() == WireFormatInfo.class) {
synchronized (this) {
try {
processInboundWireFormatInfo((WireFormatInfo) command);
} catch (IOException e) {
onException(e);
}
}
}
synchronized (readChecker) {
transportListener.onCommand(command);
}
}
} finally {
inReceive.set(false);
}
}
public void oneway(Object o) throws IOException {
// Disable inactivity monitoring while processing a command.
// synchronize this method - its not synchronized
// further down the transport stack and gets called by more
// than one thread by this class
synchronized(inSend) {
inSend.set(true);
try {
if( failed.get() ) {
throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
}
if (o.getClass() == WireFormatInfo.class) {
synchronized (this) {
processOutboundWireFormatInfo((WireFormatInfo) o);
}
}
next.oneway(o);
} finally {
commandSent.set(true);
inSend.set(false);
}
}
}
public void onException(IOException error) {
if (failed.compareAndSet(false, true)) {
stopMonitorThreads();
transportListener.onException(error);
}
}
public void setUseKeepAlive(boolean val) {
useKeepAlive = val;
}
public long getReadCheckTime() {
return readCheckTime;
}
public void setReadCheckTime(long readCheckTime) {
this.readCheckTime = readCheckTime;
}
public long getWriteCheckTime() {
return writeCheckTime;
}
public void setWriteCheckTime(long writeCheckTime) {
this.writeCheckTime = writeCheckTime;
}
public long getInitialDelayTime() {
return initialDelayTime;
}
public void setInitialDelayTime(long initialDelayTime) {
this.initialDelayTime = initialDelayTime;
}
public boolean isKeepAliveResponseRequired() {
return this.keepAliveResponseRequired;
}
public void setKeepAliveResponseRequired(boolean value) {
this.keepAliveResponseRequired = value;
}
public boolean isMonitorStarted() {
return this.monitorStarted.get();
}
protected synchronized void startMonitorThreads() throws IOException {
if (monitorStarted.get()) {
return;
}
if (!configuredOk()) {
return;
}
if (readCheckTime > 0) {
readCheckerTask = new SchedulerTimerTask(readChecker);
}
if (writeCheckTime > 0) {
writeCheckerTask = new SchedulerTimerTask(writeChecker);
}
if (writeCheckTime > 0 || readCheckTime > 0) {
monitorStarted.set(true);
synchronized(AbstractInactivityMonitor.class) {
if( CHECKER_COUNTER == 0 ) {
ASYNC_TASKS = createExecutor();
READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
}
CHECKER_COUNTER++;
if (readCheckTime > 0) {
READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
}
if (writeCheckTime > 0) {
WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
}
}
}
}
abstract protected boolean configuredOk() throws IOException;
protected synchronized void stopMonitorThreads() {
if (monitorStarted.compareAndSet(true, false)) {
if (readCheckerTask != null) {
readCheckerTask.cancel();
}
if (writeCheckerTask != null) {
writeCheckerTask.cancel();
}
synchronized( AbstractInactivityMonitor.class ) {
WRITE_CHECK_TIMER.purge();
READ_CHECK_TIMER.purge();
CHECKER_COUNTER--;
if(CHECKER_COUNTER==0) {
WRITE_CHECK_TIMER.cancel();
READ_CHECK_TIMER.cancel();
WRITE_CHECK_TIMER = null;
READ_CHECK_TIMER = null;
ASYNC_TASKS.shutdownNow();
ASYNC_TASKS = null;
}
}
}
}
private ThreadFactory factory = new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
thread.setDaemon(true);
return thread;
}
};
private ThreadPoolExecutor createExecutor() {
ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
exec.allowCoreThreadTimeOut(true);
return exec;
}
}

View File

@ -17,17 +17,8 @@
package org.apache.activemq.transport;
import java.io.IOException;
import java.util.Timer;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,184 +26,27 @@ import org.slf4j.LoggerFactory;
/**
* Used to make sure that commands are arriving periodically from the peer of
* the transport.
*
*
*/
public class InactivityMonitor extends TransportFilter {
public class InactivityMonitor extends AbstractInactivityMonitor {
private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitor.class);
private static ThreadPoolExecutor ASYNC_TASKS;
private static int CHECKER_COUNTER;
private static long DEFAULT_CHECK_TIME_MILLS = 30000;
private static Timer READ_CHECK_TIMER;
private static Timer WRITE_CHECK_TIMER;
private WireFormatInfo localWireFormatInfo;
private WireFormatInfo remoteWireFormatInfo;
private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
private final AtomicBoolean commandSent = new AtomicBoolean(false);
private final AtomicBoolean inSend = new AtomicBoolean(false);
private final AtomicBoolean failed = new AtomicBoolean(false);
private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false);
private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
private SchedulerTimerTask writeCheckerTask;
private SchedulerTimerTask readCheckerTask;
private boolean ignoreRemoteWireFormat = false;
private boolean ignoreAllWireFormatInfo = false;
private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
private boolean useKeepAlive = true;
private boolean keepAliveResponseRequired;
private WireFormat wireFormat;
private final Runnable readChecker = new Runnable() {
long lastRunTime;
public void run() {
long now = System.currentTimeMillis();
long elapsed = (now-lastRunTime);
if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
LOG.debug(""+elapsed+" ms elapsed since last read check.");
}
// Perhaps the timer executed a read check late.. and then executes
// the next read check on time which causes the time elapsed between
// read checks to be small..
// If less than 90% of the read check Time elapsed then abort this readcheck.
if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
return;
}
lastRunTime = now;
readCheck();
}
};
private boolean allowReadCheck(long elapsed) {
return elapsed > (readCheckTime * 9 / 10);
}
private final Runnable writeChecker = new Runnable() {
long lastRunTime;
public void run() {
long now = System.currentTimeMillis();
if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
}
lastRunTime = now;
writeCheck();
}
};
public InactivityMonitor(Transport next, WireFormat wireFormat) {
super(next);
this.wireFormat = wireFormat;
super(next, wireFormat);
if (this.wireFormat == null) {
this.ignoreAllWireFormatInfo = true;
}
}
public void start() throws Exception {
next.start();
startMonitorThreads();
}
public void stop() throws Exception {
stopMonitorThreads();
next.stop();
}
final void writeCheck() {
if (inSend.get()) {
if (LOG.isTraceEnabled()) {
LOG.trace("A send is in progress");
}
return;
}
if (!commandSent.get() && useKeepAlive) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
if (monitorStarted.get()) {
try {
KeepAliveInfo info = new KeepAliveInfo();
info.setResponseRequired(keepAliveResponseRequired);
oneway(info);
} catch (IOException e) {
onException(e);
}
}
};
});
} else {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " message sent since last write check, resetting flag");
}
}
commandSent.set(false);
}
final void readCheck() {
int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
if (inReceive.get() || currentCounter!=previousCounter ) {
if (LOG.isTraceEnabled()) {
LOG.trace("A receive is in progress");
}
return;
}
if (!commandReceived.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
};
});
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Message received since last read check, resetting flag: ");
}
}
commandReceived.set(false);
}
public void onCommand(Object command) {
commandReceived.set(true);
inReceive.set(true);
try {
if (command.getClass() == KeepAliveInfo.class) {
KeepAliveInfo info = (KeepAliveInfo) command;
if (info.isResponseRequired()) {
try {
info.setResponseRequired(false);
oneway(info);
} catch (IOException e) {
onException(e);
}
}
} else {
if (command.getClass() == WireFormatInfo.class) {
synchronized (this) {
protected void processInboundWireFormatInfo(WireFormatInfo info) throws IOException {
IOException error = null;
remoteWireFormatInfo = (WireFormatInfo) command;
remoteWireFormatInfo = info;
try {
startMonitorThreads();
} catch (IOException e) {
@ -222,106 +56,29 @@ public class InactivityMonitor extends TransportFilter {
onException(error);
}
}
}
synchronized (readChecker) {
transportListener.onCommand(command);
}
}
} finally {
inReceive.set(false);
}
}
public void oneway(Object o) throws IOException {
// Disable inactivity monitoring while processing a command.
//synchronize this method - its not synchronized
//further down the transport stack and gets called by more
//than one thread by this class
synchronized(inSend) {
inSend.set(true);
try {
if( failed.get() ) {
throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
}
if (o.getClass() == WireFormatInfo.class) {
synchronized (this) {
localWireFormatInfo = (WireFormatInfo)o;
protected void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException{
localWireFormatInfo = info;
startMonitorThreads();
}
}
next.oneway(o);
} finally {
commandSent.set(true);
inSend.set(false);
}
}
}
public void onException(IOException error) {
if (failed.compareAndSet(false, true)) {
stopMonitorThreads();
transportListener.onException(error);
}
}
public void setKeepAliveResponseRequired(boolean val) {
keepAliveResponseRequired = val;
}
public void setUseKeepAlive(boolean val) {
useKeepAlive = val;
}
public void setIgnoreRemoteWireFormat(boolean val) {
ignoreRemoteWireFormat = val;
}
public long getReadCheckTime() {
return readCheckTime;
}
public void setReadCheckTime(long readCheckTime) {
this.readCheckTime = readCheckTime;
}
public long getInitialDelayTime() {
return initialDelayTime;
}
public void setInitialDelayTime(long initialDelayTime) {
this.initialDelayTime = initialDelayTime;
}
private synchronized void startMonitorThreads() throws IOException {
if (monitorStarted.get()) {
@Override
protected synchronized void startMonitorThreads() throws IOException {
if (isMonitorStarted()) {
return;
}
if (!configuredOk()) {
return;
}
long readCheckTime = getReadCheckTime();
if (readCheckTime > 0) {
monitorStarted.set(true);
writeCheckerTask = new SchedulerTimerTask(writeChecker);
readCheckerTask = new SchedulerTimerTask(readChecker);
writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
synchronized( InactivityMonitor.class ) {
if( CHECKER_COUNTER == 0 ) {
ASYNC_TASKS = createExecutor();
READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
}
CHECKER_COUNTER++;
WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
}
}
setWriteCheckTime(readCheckTime>3 ? readCheckTime/3 : readCheckTime);
}
private boolean configuredOk() throws IOException {
super.startMonitorThreads();
}
@Override
protected boolean configuredOk() throws IOException {
boolean configured = false;
if (ignoreAllWireFormatInfo) {
configured = true;
@ -330,54 +87,29 @@ public class InactivityMonitor extends TransportFilter {
if (LOG.isDebugEnabled()) {
LOG.debug("Using min of local: " + localWireFormatInfo + " and remote: " + remoteWireFormatInfo);
}
readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
long readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
long writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
setReadCheckTime(readCheckTime);
setInitialDelayTime(Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()));
setWriteCheckTime(writeCheckTime);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Using local: " + localWireFormatInfo);
}
readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
long readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
long writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
setReadCheckTime(readCheckTime);
setInitialDelayTime(localWireFormatInfo.getMaxInactivityDurationInitalDelay());
setWriteCheckTime(writeCheckTime);
}
configured = true;
}
return configured;
}
/**
*
*/
private synchronized void stopMonitorThreads() {
if (monitorStarted.compareAndSet(true, false)) {
readCheckerTask.cancel();
writeCheckerTask.cancel();
synchronized( InactivityMonitor.class ) {
WRITE_CHECK_TIMER.purge();
READ_CHECK_TIMER.purge();
CHECKER_COUNTER--;
if(CHECKER_COUNTER==0) {
WRITE_CHECK_TIMER.cancel();
READ_CHECK_TIMER.cancel();
WRITE_CHECK_TIMER = null;
READ_CHECK_TIMER = null;
ASYNC_TASKS.shutdownNow();
ASYNC_TASKS = null;
}
}
}
}
private ThreadFactory factory = new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
thread.setDaemon(true);
return thread;
}
};
private ThreadPoolExecutor createExecutor() {
ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
exec.allowCoreThreadTimeOut(true);
return exec;
}
}

View File

@ -23,8 +23,6 @@ import org.apache.activemq.Service;
/**
* Represents the client side of a transport allowing messages to be sent
* synchronously, asynchronously and consumed.
*
*
*/
public interface Transport extends Service {
@ -67,43 +65,6 @@ public interface Transport extends Service {
*/
Object request(Object command, int timeout) throws IOException;
// /**
// * A one way asynchronous send
// * @param command
// * @throws IOException
// */
// void oneway(Command command) throws IOException;
//
// /**
// * An asynchronous request response where the Receipt will be returned
// * in the future. If responseCallback is not null, then it will be called
// * when the response has been completed.
// *
// * @param command
// * @param responseCallback TODO
// * @return the FutureResponse
// * @throws IOException
// */
// FutureResponse asyncRequest(Command command, ResponseCallback
// responseCallback) throws IOException;
//
// /**
// * A synchronous request response
// * @param command
// * @return the response
// * @throws IOException
// */
// Response request(Command command) throws IOException;
//
// /**
// * A synchronous request response
// * @param command
// * @param timeout
// * @return the repsonse or null if timeout
// * @throws IOException
// */
// Response request(Command command, int timeout) throws IOException;
/**
* Returns the current transport listener
*

View File

@ -242,6 +242,7 @@ public abstract class TransportFactory {
* @return
* @throws Exception
*/
@SuppressWarnings("rawtypes")
public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
transport = compositeConfigure(transport, wf, options);
@ -263,6 +264,7 @@ public abstract class TransportFactory {
* @return
* @throws Exception
*/
@SuppressWarnings("rawtypes")
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
if (options.containsKey(THREAD_NAME_FILTER)) {
transport = new ThreadNameFilter(transport);
@ -282,6 +284,7 @@ public abstract class TransportFactory {
* @param options
* @return
*/
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
transport = new WriteTimeoutFilter(transport);
@ -294,6 +297,7 @@ public abstract class TransportFactory {
return transport;
}
@SuppressWarnings("rawtypes")
protected String getOption(Map options, String key, String def) {
String rc = (String) options.remove(key);
if( rc == null ) {

View File

@ -22,10 +22,7 @@ import java.net.Socket;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
import org.apache.activemq.transport.tcp.TimeStampStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -22,12 +22,9 @@ import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.Message;
/**
* Implementations of this interface are used to map back and forth from Stomp

View File

@ -34,8 +34,6 @@ import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.util.JettisonMappedXmlDriver;
import org.codehaus.jettison.mapped.Configuration;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
@ -57,7 +55,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
public ActiveMQMessage convertFrame(ProtocolConverter converter,
StompFrame command) throws JMSException, ProtocolException {
Map headers = command.getHeaders();
Map<String, String> headers = command.getHeaders();
ActiveMQMessage msg;
String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
@ -188,6 +186,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
return objMsg;
}
@SuppressWarnings("unchecked")
protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
@ -229,6 +228,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
// Implementation methods
// -------------------------------------------------------------------------
@SuppressWarnings("unchecked")
protected XStream createXStream() {
XStream xstream = null;
if (brokerContext != null) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.stomp;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -23,10 +24,17 @@ import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.*;
/**
* Implements ActiveMQ 4.0 translations
@ -35,7 +43,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
final Map headers = command.getHeaders();
final Map<?, ?> headers = command.getHeaders();
final ActiveMQMessage msg;
/*
* To reduce the complexity of this method perhaps a Chain of Responsibility
@ -46,7 +54,12 @@ public class LegacyFrameTranslator implements FrameTranslator {
if(intendedType.equalsIgnoreCase("text")){
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
text.setText(new String(command.getContent(), "UTF-8"));
//text.setText(new String(command.getContent(), "UTF-8"));
ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
DataOutputStream data = new DataOutputStream(bytes);
data.writeInt(command.getContent().length);
data.write(command.getContent());
text.setContent(bytes.toByteSequence());
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
@ -66,7 +79,12 @@ public class LegacyFrameTranslator implements FrameTranslator {
} else {
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
text.setText(new String(command.getContent(), "UTF-8"));
//text.setText(new String(command.getContent(), "UTF-8"));
ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
DataOutputStream data = new DataOutputStream(bytes);
data.writeInt(command.getContent().length);
data.write(command.getContent());
text.setContent(bytes.toByteSequence());
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
@ -86,8 +104,17 @@ public class LegacyFrameTranslator implements FrameTranslator {
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
if (!message.isCompressed() && message.getContent() != null) {
ByteSequence msgContent = message.getContent();
if (msgContent.getLength() > 4) {
byte[] content = new byte[msgContent.getLength() - 4];
System.arraycopy(msgContent.data, 4, content, 0, content.length);
command.setContent(content);
}
} else {
ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
command.setContent(msg.getText().getBytes("UTF-8"));
}
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
@ -96,7 +123,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
byte[] data = new byte[(int)msg.getBodyLength()];
msg.readBytes(data);
headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length);
headers.put(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length));
command.setContent(data);
} else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
@ -122,7 +149,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
return rc;
}
StringBuffer buffer = new StringBuffer();
StringBuilder buffer = new StringBuilder();
if (activeMQDestination.isQueue()) {
if (activeMQDestination.isTemporary()) {
buffer.append("/remote-temp-queue/");

View File

@ -16,10 +16,16 @@
*/
package org.apache.activemq.transport.stomp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -34,6 +40,7 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
@ -62,7 +69,6 @@ import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContextAware;
/**
* @author <a href="http://hiramchirino.com">chirino</a>
@ -73,6 +79,26 @@ public class ProtocolConverter {
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private static final String BROKER_VERSION;
private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
private static final long DEFAULT_OUTBOUND_HEARTBEAT = 100;
private static final long DEFAULT_INBOUND_HEARTBEAT = 1000;
private static final long DEFAULT_INITIAL_HEARTBEAT_DELAY = 1000;
static {
InputStream in = null;
String version = "5.6.0";
if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
try {
version = reader.readLine();
} catch(Exception e) {
}
}
BROKER_VERSION = version;
}
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1);
private final ProducerId producerId = new ProducerId(sessionId, 1);
@ -84,6 +110,7 @@ public class ProtocolConverter {
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
@ -92,13 +119,15 @@ public class ProtocolConverter {
private final Object commnadIdMutex = new Object();
private int lastCommandId;
private final AtomicBoolean connected = new AtomicBoolean(false);
private final FrameTranslator frameTranslator;
private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
private final BrokerContext brokerContext;
private String version = "1.0";
private long hbReadInterval = DEFAULT_INBOUND_HEARTBEAT;
private long hbWriteInterval = DEFAULT_OUTBOUND_HEARTBEAT;
public ProtocolConverter(StompTransport stompTransport, FrameTranslator translator, BrokerContext brokerContext) {
public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
this.stompTransport = stompTransport;
this.frameTranslator = translator;
this.brokerContext = brokerContext;
}
@ -178,6 +207,8 @@ public class ProtocolConverter {
onStompSend(command);
} else if (action.startsWith(Stomp.Commands.ACK)) {
onStompAck(command);
} else if (action.startsWith(Stomp.Commands.NACK)) {
onStompNack(command);
} else if (action.startsWith(Stomp.Commands.BEGIN)) {
onStompBegin(command);
} else if (action.startsWith(Stomp.Commands.COMMIT)) {
@ -188,7 +219,8 @@ public class ProtocolConverter {
onStompSubscribe(command);
} else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
onStompUnsubscribe(command);
} else if (action.startsWith(Stomp.Commands.CONNECT)) {
} else if (action.startsWith(Stomp.Commands.CONNECT) ||
action.startsWith(Stomp.Commands.STOMP)) {
onStompConnect(command);
} else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
onStompDisconnect(command);
@ -199,7 +231,7 @@ public class ProtocolConverter {
} catch (ProtocolException e) {
handleException(e, command);
// Some protocol errors can cause the connection to get closed.
if( e.isFatal() ) {
if (e.isFatal()) {
getStompTransport().onException(e);
}
}
@ -219,6 +251,7 @@ public class ProtocolConverter {
HashMap<String, String> headers = new HashMap<String, String>();
headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
if (command != null) {
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
@ -235,6 +268,11 @@ public class ProtocolConverter {
checkConnected();
Map<String, String> headers = command.getHeaders();
String destination = headers.get(Stomp.Headers.Send.DESTINATION);
if (destination == null) {
throw new ProtocolException("SEND received without a Destination specified!");
}
String stompTx = headers.get(Stomp.Headers.TRANSACTION);
headers.remove("transaction");
@ -255,24 +293,64 @@ public class ProtocolConverter {
message.onSend();
sendToActiveMQ(message, createResponseHandler(command));
}
protected void onStompNack(StompFrame command) throws ProtocolException {
checkConnected();
if (this.version.equals(Stomp.V1_1)) {
throw new ProtocolException("NACK received but connection is in v1.0 mode.");
}
Map<String, String> headers = command.getHeaders();
String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
if (subscriptionId == null) {
throw new ProtocolException("NACK received without a subscription id for acknowledge!");
}
String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
if (messageId == null) {
throw new ProtocolException("NACK received without a message-id to acknowledge!");
}
TransactionId activemqTx = null;
String stompTx = headers.get(Stomp.Headers.TRANSACTION);
if (stompTx != null) {
activemqTx = transactions.get(stompTx);
if (activemqTx == null) {
throw new ProtocolException("Invalid transaction id: " + stompTx);
}
}
if (subscriptionId != null) {
StompSubscription sub = this.subscriptions.get(subscriptionId);
if (sub != null) {
MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command));
} else {
throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
}
}
}
}
protected void onStompAck(StompFrame command) throws ProtocolException {
checkConnected();
// TODO: acking with just a message id is very bogus
// since the same message id could have been sent to 2 different
// subscriptions
// on the same stomp connection. For example, when 2 subs are created on
// the same topic.
Map<String, String> headers = command.getHeaders();
String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
if (messageId == null) {
throw new ProtocolException("ACK received without a message-id to acknowledge!");
}
String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
throw new ProtocolException("ACK received without a subscription id for acknowledge!");
}
TransactionId activemqTx = null;
String stompTx = headers.get(Stomp.Headers.TRANSACTION);
if (stompTx != null) {
@ -283,21 +361,37 @@ public class ProtocolConverter {
}
boolean acked = false;
for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
if (subscriptionId != null) {
StompSubscription sub = this.subscriptions.get(subscriptionId);
if (sub != null) {
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command));
acked = true;
}
}
} else {
// TODO: acking with just a message id is very bogus since the same message id
// could have been sent to 2 different subscriptions on the same Stomp connection.
// For example, when 2 subs are created on the same topic.
for (StompSubscription sub : subscriptionsByConsumerId.values()) {
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
if (ack != null) {
ack.setTransactionId(activemqTx);
sendToActiveMQ(ack, createResponseHandler(command));
acked = true;
break;
}
}
}
if (!acked) {
throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
}
}
protected void onStompBegin(StompFrame command) throws ProtocolException {
@ -324,7 +418,6 @@ public class ProtocolConverter {
tx.setType(TransactionInfo.BEGIN);
sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompCommit(StompFrame command) throws ProtocolException {
@ -342,8 +435,7 @@ public class ProtocolConverter {
throw new ProtocolException("Invalid transaction id: " + stompTx);
}
for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
for (StompSubscription sub : subscriptionsByConsumerId.values()) {
sub.onStompCommit(activemqTx);
}
@ -353,7 +445,6 @@ public class ProtocolConverter {
tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompAbort(StompFrame command) throws ProtocolException {
@ -369,8 +460,7 @@ public class ProtocolConverter {
if (activemqTx == null) {
throw new ProtocolException("Invalid transaction id: " + stompTx);
}
for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
for (StompSubscription sub : subscriptionsByConsumerId.values()) {
try {
sub.onStompAbort(activemqTx);
} catch (Exception e) {
@ -384,7 +474,6 @@ public class ProtocolConverter {
tx.setType(TransactionInfo.ROLLBACK);
sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompSubscribe(StompFrame command) throws ProtocolException {
@ -395,6 +484,10 @@ public class ProtocolConverter {
String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
throw new ProtocolException("SUBSCRIBE received without a subscription id!");
}
ActiveMQDestination actualDest = translator.convertDestination(this, destination);
if (actualDest == null) {
@ -406,6 +499,16 @@ public class ProtocolConverter {
consumerInfo.setPrefetchSize(1000);
consumerInfo.setDispatchAsync(true);
String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
if (browser != null && browser.equals(Stomp.TRUE)) {
if (!this.version.equals(Stomp.V1_1)) {
throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1 clients!");
}
consumerInfo.setBrowser(true);
}
String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
consumerInfo.setSelector(selector);
@ -413,7 +516,12 @@ public class ProtocolConverter {
consumerInfo.setDestination(translator.convertDestination(this, destination));
StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
StompSubscription stompSubscription;
if (!consumerInfo.isBrowser()) {
stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
} else {
stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
}
stompSubscription.setDestination(actualDest);
String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
@ -426,8 +534,11 @@ public class ProtocolConverter {
}
subscriptionsByConsumerId.put(id, stompSubscription);
// Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
if (subscriptionId != null) {
subscriptions.put(subscriptionId, stompSubscription);
}
sendToActiveMQ(consumerInfo, createResponseHandler(command));
}
protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
@ -441,6 +552,9 @@ public class ProtocolConverter {
}
String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
}
if (subscriptionId == null && destination == null) {
throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
@ -457,19 +571,27 @@ public class ProtocolConverter {
return;
}
// TODO: Unsubscribing using a destination is a bit wierd if multiple
// subscriptions
// are created with the same destination. Perhaps this should be
// removed.
//
if (subscriptionId != null) {
StompSubscription sub = this.subscriptions.remove(subscriptionId);
if (sub != null) {
sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
return;
}
} else {
// Unsubscribing using a destination is a bit weird if multiple subscriptions
// are created with the same destination.
for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
if ((subscriptionId != null && subscriptionId.equals(sub.getSubscriptionId())) || (destination != null && destination.equals(sub.getDestination()))) {
if (destination != null && destination.equals(sub.getDestination())) {
sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
iter.remove();
return;
}
}
}
throw new ProtocolException("No subscription matched.");
}
@ -488,10 +610,28 @@ public class ProtocolConverter {
String login = headers.get(Stomp.Headers.Connect.LOGIN);
String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
if (accepts == null) {
accepts = Stomp.DEFAULT_VERSION;
}
if (heartBeat == null) {
heartBeat = Stomp.DEFAULT_HEART_BEAT;
}
HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA)));
acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
if (acceptsVersions.isEmpty()) {
throw new ProtocolException("Invlid Protocol version, supported versions are: " +
Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
} else {
this.version = Collections.max(acceptsVersions);
}
configureInactivityMonitor(heartBeat);
IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
connectionInfo.setConnectionId(connectionId);
if (clientId != null) {
connectionInfo.setClientId(clientId);
@ -544,10 +684,22 @@ public class ProtocolConverter {
responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
}
responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
String.format("%d,%d", hbWriteInterval, hbReadInterval));
responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
StompFrame sc = new StompFrame();
sc.setAction(Stomp.Responses.CONNECTED);
sc.setHeaders(responseHeaders);
sendToStomp(sc);
if (version.equals(Stomp.V1_1)) {
StompWireFormat format = stompTransport.getWireFormat();
if (format != null) {
format.setEncodingEnabled(true);
}
}
}
});
@ -576,7 +728,6 @@ public class ProtocolConverter {
*/
public void onActiveMQCommand(Command command) throws IOException, JMSException {
if (command.isResponse()) {
Response response = (Response)command;
ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
if (rh != null) {
@ -589,12 +740,13 @@ public class ProtocolConverter {
}
}
} else if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch)command;
StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null) {
sub.onMessageDispatch(md);
}
} else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
stompTransport.sendToStomp(ping);
} else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
// Pass down any unexpected async errors. Should this close the connection?
Throwable exception = ((ConnectionError)command).getException();
@ -643,4 +795,49 @@ public class ProtocolConverter {
public String getCreatedTempDestinationName(ActiveMQDestination destination) {
return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
}
protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
if (keepAliveOpts == null || keepAliveOpts.length != 2) {
throw new ProtocolException("Invlid heart-beat header:" + heartBeatConfig, true);
} else {
try {
hbReadInterval = Long.parseLong(keepAliveOpts[0]);
hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
} catch(NumberFormatException e) {
throw new ProtocolException("Invlid heart-beat header:" + heartBeatConfig, true);
}
if (hbReadInterval > 0) {
hbReadInterval = Math.max(DEFAULT_INBOUND_HEARTBEAT, hbReadInterval);
hbReadInterval += Math.min(hbReadInterval, 5000);
}
if (hbWriteInterval > 0) {
hbWriteInterval = Math.max(DEFAULT_OUTBOUND_HEARTBEAT, hbWriteInterval);
}
try {
StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
monitor.setReadCheckTime(hbReadInterval);
monitor.setInitialDelayTime(DEFAULT_INITIAL_HEARTBEAT_DELAY);
monitor.setWriteCheckTime(hbWriteInterval);
monitor.startMonitoring();
} catch(Exception ex) {
hbReadInterval = 0;
hbWriteInterval = 0;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Stomp Connect heartbeat conf RW[" + hbReadInterval + "," + hbWriteInterval + "]");
}
}
}
}

View File

@ -20,7 +20,29 @@ public interface Stomp {
String NULL = "\u0000";
String NEWLINE = "\n";
byte BREAK = '\n';
byte COLON = ':';
byte ESCAPE = '\\';
byte[] ESCAPE_ESCAPE_SEQ = { 92, 92 };
byte[] COLON_ESCAPE_SEQ = { 92, 99 };
byte[] NEWLINE_ESCAPE_SEQ = { 92, 110 };
String COMMA = ",";
String V1_0 = "1.0";
String V1_1 = "1.1";
String DEFAULT_HEART_BEAT = "0,0";
String DEFAULT_VERSION = "1.0";
String EMPTY = "";
String[] SUPPORTED_PROTOCOL_VERSIONS = {"1.1", "1.0"};
String TEXT_PLAIN = "text/plain";
String TRUE = "true";
String FALSE = "false";
String END = "end";
public static interface Commands {
String STOMP = "STOMP";
String CONNECT = "CONNECT";
String SEND = "SEND";
String DISCONNECT = "DISCONNECT";
@ -34,6 +56,8 @@ public interface Stomp {
String COMMIT = "COMMIT";
String ABORT = "ABORT";
String ACK = "ACK";
String NACK = "NACK";
String KEEPALIVE = "KEEPALIVE";
}
public interface Responses {
@ -48,8 +72,10 @@ public interface Stomp {
String RECEIPT_REQUESTED = "receipt";
String TRANSACTION = "transaction";
String CONTENT_LENGTH = "content-length";
String CONTENT_TYPE = "content-type";
String TRANSFORMATION = "transformation";
String TRANSFORMATION_ERROR = "transformation-error";
/**
* This header is used to instruct ActiveMQ to construct the message
* based with a specific type.
@ -81,6 +107,7 @@ public interface Stomp {
String TIMESTAMP = "timestamp";
String TYPE = "type";
String SUBSCRIPTION = "subscription";
String BROWSER = "browser";
String USERID = "JMSXUserID";
String ORIGINAL_DESTINATION = "original-destination";
}
@ -90,6 +117,7 @@ public interface Stomp {
String ACK_MODE = "ack";
String ID = "id";
String SELECTOR = "selector";
String BROWSER = "browser";
public interface AckModeValues {
String AUTO = "auto";
@ -108,6 +136,9 @@ public interface Stomp {
String PASSCODE = "passcode";
String CLIENT_ID = "client-id";
String REQUEST_ID = "request-id";
String ACCEPT_VERSION = "accept-version";
String HOST = "host";
String HEART_BEAT = "heart-beat";
}
public interface Error {
@ -117,10 +148,14 @@ public interface Stomp {
public interface Connected {
String SESSION = "session";
String RESPONSE_ID = "response-id";
String SERVER = "server";
String VERSION = "version";
String HEART_BEAT = "heart-beat";
}
public interface Ack {
String MESSAGE_ID = "message-id";
String SUBSCRIPTION = "subscription";
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport.stomp;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.wireformat.WireFormat;
import java.io.ByteArrayInputStream;
import java.util.HashMap;

View File

@ -123,7 +123,7 @@ public class StompConnection {
}
public void connect(String username, String password, String client) throws Exception {
HashMap<String, String> headers = new HashMap();
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("login", username);
headers.put("passcode", password);
if (client != null) {
@ -235,7 +235,7 @@ public class StompConnection {
}
protected String appendHeaders(HashMap<String, Object> headers) {
StringBuffer result = new StringBuffer();
StringBuilder result = new StringBuilder();
for (String key : headers.keySet()) {
result.append(key + ":" + headers.get(key) + "\n");
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport.stomp;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.command.Command;
@ -180,12 +179,11 @@ public class StompFrame implements Command {
}
public String format(boolean forLogging) {
StringBuffer buffer = new StringBuffer();
StringBuilder buffer = new StringBuilder();
buffer.append(getAction());
buffer.append("\n");
Map headers = getHeaders();
for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry)iter.next();
Map<String, String> headers = getHeaders();
for (Map.Entry<String, String> entry : headers.entrySet()) {
buffer.append(entry.getKey());
buffer.append(":");
if (forLogging && entry.getKey().toString().toLowerCase().contains(Stomp.Headers.Connect.PASSCODE)) {

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.AbstractInactivityMonitor;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Used to make sure that commands are arriving periodically from the peer of
* the transport.
*/
public class StompInactivityMonitor extends AbstractInactivityMonitor {
private static final Logger LOG = LoggerFactory.getLogger(StompInactivityMonitor.class);
private boolean isConfigured = false;
public StompInactivityMonitor(Transport next, WireFormat wireFormat) {
super(next, wireFormat);
}
public void startMonitoring() throws IOException {
this.isConfigured = true;
this.startMonitorThreads();
}
@Override
protected void processInboundWireFormatInfo(WireFormatInfo info) throws IOException {
}
@Override
protected void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException{
}
@Override
protected boolean configuredOk() throws IOException {
if (!isConfigured) {
return false;
}
LOG.debug("Stomp Inactivity Monitor read check: " + getReadCheckTime() +
", write check: " + getWriteCheckTime());
if (this.getReadCheckTime() >= 0 && this.getWriteCheckTime() >= 0) {
return true;
}
return false;
}
}

View File

@ -17,12 +17,10 @@
package org.apache.activemq.transport.stomp;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import javax.net.SocketFactory;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;

View File

@ -26,19 +26,14 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import javax.net.SocketFactory;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;

View File

@ -35,7 +35,6 @@ import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.xbean.XBeanBrokerService;
/**
* A <a href="http://stomp.codehaus.org/">STOMP</a> over NIO transport factory
@ -62,18 +61,13 @@ public class StompNIOTransportFactory extends NIOTransportFactory implements Bro
return new StompNIOTransport(wf, socketFactory, location, localLocation);
}
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), brokerContext);
transport = new StompTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
protected boolean isUseInactivityMonitor(Transport transport) {
// lets disable the inactivity monitor as stomp does not use keep alive
// packets
return false;
}
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.TransactionId;
public class StompQueueBrowserSubscription extends StompSubscription {
public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
super(stompTransport, subscriptionId, consumerInfo, transformation);
}
@Override
void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
if (md.getMessage() != null) {
super.onMessageDispatch(md);
} else {
StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE);
browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId());
browseDone.getHeaders().put(Stomp.Headers.Message.BROWSER, "end");
browseDone.getHeaders().put(Stomp.Headers.Message.DESTINATION,
protocolConverter.findTranslator(null).convertDestination(protocolConverter, this.destination));
browseDone.getHeaders().put(Stomp.Headers.Message.MESSAGE_ID, "0");
protocolConverter.sendToStomp(browseDone);
}
}
@Override
public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
throw new ProtocolException("Cannot Nack a message on a Queue Browser Subscription.");
}
}

View File

@ -39,8 +39,9 @@ public class StompSslTransportFactory extends SslTransportFactory implements Bro
return "stomp";
}
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), brokerContext);
transport = new StompTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}

View File

@ -45,17 +45,16 @@ public class StompSubscription {
public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
private final ProtocolConverter protocolConverter;
private final String subscriptionId;
private final ConsumerInfo consumerInfo;
protected final ProtocolConverter protocolConverter;
protected final String subscriptionId;
protected final ConsumerInfo consumerInfo;
private final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
private final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
private String ackMode = AUTO_ACK;
private ActiveMQDestination destination;
private String transformation;
protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
protected String ackMode = AUTO_ACK;
protected ActiveMQDestination destination;
protected String transformation;
public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
this.protocolConverter = stompTransport;
@ -105,14 +104,15 @@ public class StompSubscription {
}
synchronized void onStompCommit(TransactionId transactionId) {
for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
@SuppressWarnings("rawtypes")
Map.Entry entry = (Entry)iter.next();
MessageId id = (MessageId)entry.getKey();
MessageDispatch msg = (MessageDispatch)entry.getValue();
if (unconsumedMessage.contains(msg)) {
iter.remove();
}
}
unconsumedMessage.clear();
}
@ -131,8 +131,9 @@ public class StompSubscription {
if (ackMode == CLIENT_ACK) {
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
int count = 0;
for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
@SuppressWarnings("rawtypes")
Map.Entry entry = (Entry)iter.next();
MessageId id = (MessageId)entry.getKey();
MessageDispatch msg = (MessageDispatch)entry.getValue();
@ -149,21 +150,19 @@ public class StompSubscription {
iter.remove();
}
count++;
if (id.equals(msgId)) {
ack.setLastMessageId(id);
break;
}
}
ack.setMessageCount(count);
if (transactionId != null) {
ack.setTransactionId(transactionId);
}
}
else if (ackMode == INDIVIDUAL_ACK) {
} else if (ackMode == INDIVIDUAL_ACK) {
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
ack.setMessageID(msgId);
if (transactionId != null) {
@ -175,6 +174,28 @@ public class StompSubscription {
return ack;
}
public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
MessageId msgId = new MessageId(messageId);
if (!dispatchedMessage.containsKey(msgId)) {
return null;
}
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
ack.setAckType(MessageAck.POSION_ACK_TYPE);
ack.setMessageID(msgId);
if (transactionId != null) {
unconsumedMessage.add(dispatchedMessage.get(msgId));
ack.setTransactionId(transactionId);
}
dispatchedMessage.remove(msgId);
return null;
}
public String getAckMode() {
return ackMode;
}
@ -198,5 +219,4 @@ public class StompSubscription {
public ConsumerInfo getConsumerInfo() {
return consumerInfo;
}
}

View File

@ -35,4 +35,7 @@ public interface StompTransport {
public void onException(IOException error);
public StompInactivityMonitor getInactivityMonitor();
public StompWireFormat getWireFormat();
}

View File

@ -16,21 +16,15 @@
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.xbean.XBeanBrokerService;
/**
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
@ -45,19 +39,24 @@ public class StompTransportFactory extends TcpTransportFactory implements Broker
return "stomp";
}
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), brokerContext);
transport = new StompTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
protected boolean isUseInactivityMonitor(Transport transport) {
// lets disable the inactivity monitor as stomp does not use keep alive
// packets
return false;
}
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}
@Override
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
StompInactivityMonitor monitor = new StompInactivityMonitor(transport, format);
StompTransportFilter filter = (StompTransportFilter) transport.narrow(StompTransportFilter.class);
filter.setInactivityMonitor(monitor);
return monitor;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,14 +43,18 @@ import org.slf4j.LoggerFactory;
public class StompTransportFilter extends TransportFilter implements StompTransport {
private static final Logger LOG = LoggerFactory.getLogger(StompTransportFilter.class);
private final ProtocolConverter protocolConverter;
private final FrameTranslator frameTranslator;
private StompInactivityMonitor monitor;
private StompWireFormat wireFormat;
private boolean trace;
public StompTransportFilter(Transport next, FrameTranslator translator, BrokerContext brokerContext) {
public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
super(next);
this.frameTranslator = translator;
this.protocolConverter = new ProtocolConverter(this, translator, brokerContext);
this.protocolConverter = new ProtocolConverter(this, brokerContext);
if (wireFormat instanceof StompWireFormat) {
this.wireFormat = (StompWireFormat) wireFormat;
}
}
public void oneway(Object o) throws IOException {
@ -92,10 +97,6 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
}
}
public FrameTranslator getFrameTranslator() {
return frameTranslator;
}
public X509Certificate[] getPeerCertificates() {
if(next instanceof SslTransport) {
X509Certificate[] peerCerts = ((SslTransport)next).getPeerCertificates();
@ -114,4 +115,18 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
public void setTrace(boolean trace) {
this.trace = trace;
}
@Override
public StompInactivityMonitor getInactivityMonitor() {
return monitor;
}
public void setInactivityMonitor(StompInactivityMonitor monitor) {
this.monitor = monitor;
}
@Override
public StompWireFormat getWireFormat() {
return this.wireFormat;
}
}

View File

@ -21,8 +21,9 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.util.ByteArrayInputStream;
@ -44,6 +45,7 @@ public class StompWireFormat implements WireFormat {
private static final int MAX_HEADERS = 1000;
private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
private boolean encodingEnabled = false;
private int version = 1;
public ByteSequence marshal(Object command) throws IOException {
@ -63,16 +65,20 @@ public class StompWireFormat implements WireFormat {
public void marshal(Object command, DataOutput os) throws IOException {
StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command;
StringBuffer buffer = new StringBuffer();
if (stomp.getAction().equals(Stomp.Commands.KEEPALIVE)) {
os.write(Stomp.BREAK);
return;
}
StringBuilder buffer = new StringBuilder();
buffer.append(stomp.getAction());
buffer.append(Stomp.NEWLINE);
// Output the headers.
for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry)iter.next();
for (Map.Entry<String, String> entry : stomp.getHeaders().entrySet()) {
buffer.append(entry.getKey());
buffer.append(Stomp.Headers.SEPERATOR);
buffer.append(entry.getValue());
buffer.append(encodeHeader(entry.getValue()));
buffer.append(Stomp.NEWLINE);
}
@ -129,7 +135,6 @@ public class StompWireFormat implements WireFormat {
baos.close();
data = baos.toByteArray();
}
}
return new StompFrame(action, headers, data);
@ -137,10 +142,14 @@ public class StompWireFormat implements WireFormat {
} catch (ProtocolException e) {
return new StompFrameError(e);
}
}
private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException {
ByteSequence sequence = readHeaderLine(in, maxLength, errorMessage);
return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8").trim();
}
private ByteSequence readHeaderLine(DataInput in, int maxLength, String errorMessage) throws IOException {
byte b;
ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
while ((b = in.readByte()) != '\n') {
@ -150,8 +159,7 @@ public class StompWireFormat implements WireFormat {
baos.write(b);
}
baos.close();
ByteSequence sequence = baos.toByteSequence();
return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8");
return baos.toByteSequence();
}
protected String parseAction(DataInput in) throws IOException {
@ -175,17 +183,31 @@ public class StompWireFormat implements WireFormat {
protected HashMap<String, String> parseHeaders(DataInput in) throws IOException {
HashMap<String, String> headers = new HashMap<String, String>(25);
while (true) {
String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
if (line != null && line.trim().length() > 0) {
ByteSequence line = readHeaderLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
if (line != null && line.length > 0) {
if (headers.size() > MAX_HEADERS) {
throw new ProtocolException("The maximum number of headers was exceeded", true);
}
try {
int seperatorIndex = line.indexOf(Stomp.Headers.SEPERATOR);
String name = line.substring(0, seperatorIndex).trim();
String value = line.substring(seperatorIndex + 1, line.length()).trim();
ByteArrayInputStream headerLine = new ByteArrayInputStream(line);
ByteArrayOutputStream stream = new ByteArrayOutputStream(line.length);
// First complete the name
int result = -1;
while ((result = headerLine.read()) != -1) {
if (result != ':') {
stream.write(result);
} else {
break;
}
}
ByteSequence nameSeq = stream.toByteSequence();
String name = new String(nameSeq.getData(), nameSeq.getOffset(), nameSeq.getLength(), "UTF-8").trim();
String value = decodeHeader(headerLine).trim();
headers.put(name, value);
} catch (Exception e) {
throw new ProtocolException("Unable to parser header line [" + line + "]", true);
@ -212,6 +234,67 @@ public class StompWireFormat implements WireFormat {
return length;
}
private String encodeHeader(String header) throws IOException {
String result = header;
if (this.encodingEnabled) {
byte[] utf8buf = header.getBytes("UTF-8");
ByteArrayOutputStream stream = new ByteArrayOutputStream(utf8buf.length);
for(byte val : utf8buf) {
switch(val) {
case Stomp.ESCAPE:
stream.write(Stomp.ESCAPE_ESCAPE_SEQ);
break;
case Stomp.BREAK:
stream.write(Stomp.NEWLINE_ESCAPE_SEQ);
break;
case Stomp.COLON:
stream.write(Stomp.COLON_ESCAPE_SEQ);
break;
default:
stream.write(val);
}
}
}
return result;
}
private String decodeHeader(InputStream header) throws IOException {
ByteArrayOutputStream decoded = new ByteArrayOutputStream();
PushbackInputStream stream = new PushbackInputStream(header);
int value = -1;
while( (value = stream.read()) != -1) {
if (value == 92) {
int next = stream.read();
if (next != -1) {
switch(next) {
case 110:
decoded.write(Stomp.BREAK);
break;
case 99:
decoded.write(Stomp.COLON);
break;
case 92:
decoded.write(Stomp.ESCAPE);
break;
default:
stream.unread(next);
decoded.write(value);
}
} else {
decoded.write(value);
}
} else {
decoded.write(value);
}
}
return new String(decoded.toByteArray(), "UTF-8");
}
public int getVersion() {
return version;
}
@ -220,4 +303,12 @@ public class StompWireFormat implements WireFormat {
this.version = version;
}
public boolean isEncodingEnabled() {
return this.encodingEnabled;
}
public void setEncodingEnabled(boolean value) {
this.encodingEnabled = value;
}
}

View File

@ -20,9 +20,6 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
@ -30,21 +27,13 @@ import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
@ -91,6 +80,7 @@ public class SslTransportFactory extends TcpTransportFactory {
* Overriding to allow for proper configuration through reflection but delegate to get common
* configuration
*/
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
SslTransport sslTransport = (SslTransport)transport.narrow(SslTransport.class);
@ -120,8 +110,6 @@ public class SslTransportFactory extends TcpTransportFactory {
return new SslTransport(wf, (SSLSocketFactory)socketFactory, location, localLocation, false);
}
/**
* Creates a new SSL ServerSocketFactory. The given factory will use
* user-provided key and trust managers (if the user provided them).
@ -161,7 +149,6 @@ public class SslTransportFactory extends TcpTransportFactory {
} else {
return SSLSocketFactory.getDefault();
}
}
/**

View File

@ -20,16 +20,22 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.apache.activemq.Service;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.transport.Transport;
@ -64,7 +70,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected DataInputStream dataIn;
protected TimeStampStream buffOut = null;
/**
* The Traffic Class to be set on the socket.
*/
@ -636,7 +641,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
return receiveCounter;
}
/**
* @param sock The socket on which to set the Traffic Class.
* @return Whether or not the Traffic Class was set on the given socket.

View File

@ -79,6 +79,7 @@ public class TcpTransportFactory extends TransportFactory {
return new TcpTransportServer(this, location, serverSocketFactory);
}
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class);
@ -98,11 +99,10 @@ public class TcpTransportFactory extends TransportFactory {
boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
if (useInactivityMonitor && isUseInactivityMonitor(transport)) {
transport = new InactivityMonitor(transport, format);
transport = createInactivityMonitor(transport, format);
IntrospectionSupport.setProperties(transport, options);
}
// Only need the WireFormatNegotiator if using openwire
if (format instanceof OpenWireFormat) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
@ -162,4 +162,8 @@ public class TcpTransportFactory extends TransportFactory {
protected SocketFactory createSocketFactory() throws IOException {
return SocketFactory.getDefault();
}
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
return new InactivityMonitor(transport, format);
}
}

View File

@ -27,7 +27,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

View File

@ -0,0 +1,546 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Stomp11Test extends CombinationTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompTest.class);
protected String bindAddress = "stomp://localhost:61613";
protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
protected String jmsUri = "vm://localhost";
private BrokerService broker;
private StompConnection stompConnection = new StompConnection();
@Override
protected void setUp() throws Exception {
broker = BrokerFactory.createBroker(new URI(confUri));
broker.start();
broker.waitUntilStarted();
stompConnect();
}
private void stompConnect() throws IOException, URISyntaxException, UnknownHostException {
URI connectUri = new URI(bindAddress);
stompConnection.open(createSocket(connectUri));
}
protected Socket createSocket(URI connectUri) throws IOException {
return new Socket("127.0.0.1", connectUri.getPort());
}
protected String getQueueName() {
return getClass().getName() + "." + getName();
}
@Override
protected void tearDown() throws Exception {
try {
stompDisconnect();
} catch(Exception e) {
// Some tests explicitly disconnect from stomp so can ignore
} finally {
broker.stop();
broker.waitUntilStopped();
}
}
private void stompDisconnect() throws IOException {
if (stompConnection != null) {
stompConnection.close();
stompConnection = null;
}
}
public void testConnect() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.1\n" +
"host:localhost\n" +
"request-id: 1\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("response-id:1") >= 0);
assertTrue(f.indexOf("version:1.1") >= 0);
assertTrue(f.indexOf("session:") >= 0);
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testConnectWithVersionOptions() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.0,1.1\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.1") >= 0);
assertTrue(f.indexOf("session:") >= 0);
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testConnectWithValidFallback() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.0,10.1\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.0") >= 0);
assertTrue(f.indexOf("session:") >= 0);
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testConnectWithInvalidFallback() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:9.0,10.1\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("ERROR"));
assertTrue(f.indexOf("version") >= 0);
assertTrue(f.indexOf("message:") >= 0);
}
public void testHeartbeats() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.1\n" +
"heart-beat:0,1000\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.1") >= 0);
assertTrue(f.indexOf("heart-beat:") >= 0);
assertTrue(f.indexOf("session:") >= 0);
LOG.debug("Broker sent: " + f);
stompConnection.getStompSocket().getOutputStream().write('\n');
DataInputStream in = new DataInputStream(stompConnection.getStompSocket().getInputStream());
in.read();
{
long startTime = System.currentTimeMillis();
int input = in.read();
assertEquals("did not receive the correct hear beat value", '\n', input);
long endTime = System.currentTimeMillis();
assertTrue("Broker did not send KeepAlive in time", (endTime - startTime) >= 900);
}
{
long startTime = System.currentTimeMillis();
int input = in.read();
assertEquals("did not receive the correct hear beat value", '\n', input);
long endTime = System.currentTimeMillis();
assertTrue("Broker did not send KeepAlive in time", (endTime - startTime) >= 900);
}
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testHeartbeatsDropsIdleConnection() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.1\n" +
"heart-beat:1000,0\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.1") >= 0);
assertTrue(f.indexOf("heart-beat:") >= 0);
assertTrue(f.indexOf("session:") >= 0);
LOG.debug("Broker sent: " + f);
long startTime = System.currentTimeMillis();
try {
f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
fail();
} catch(Exception e) {
}
long endTime = System.currentTimeMillis();
assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000);
}
public void testRejectInvalidHeartbeats1() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.1\n" +
"heart-beat:0\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("ERROR"));
assertTrue(f.indexOf("heart-beat") >= 0);
assertTrue(f.indexOf("message:") >= 0);
}
public void testRejectInvalidHeartbeats2() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.1\n" +
"heart-beat:T,0\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("ERROR"));
assertTrue(f.indexOf("heart-beat") >= 0);
assertTrue(f.indexOf("message:") >= 0);
}
public void testRejectInvalidHeartbeats3() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.1\n" +
"heart-beat:100,10,50\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("ERROR"));
assertTrue(f.indexOf("heart-beat") >= 0);
assertTrue(f.indexOf("message:") >= 0);
}
public void testSubscribeAndUnsubscribe() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.1\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(message);
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
Thread.sleep(2000);
stompConnection.sendFrame(message);
try {
frame = stompConnection.receiveFrame();
LOG.info("Received frame: " + frame);
fail("No message should have been received since subscription was removed");
} catch (SocketTimeoutException e) {
}
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testSubscribeWithNoId() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.1\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testUnsubscribeWithNoId() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.1\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
Thread.sleep(2000);
frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testAckMessageWithId() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.1\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(message);
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
frame = "ACK\n" + "subscription:12345\n" + "message-id:" +
received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testAckMessageWithNoId() throws Exception {
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.1\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(message);
String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
StompFrame received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
String ack = "ACK\n" + "message-id:" +
received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(ack);
StompFrame error = stompConnection.receive();
assertTrue(error.getAction().equals("ERROR"));
String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(unsub);
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testQueueBrowerSubscription() throws Exception {
final int MSG_COUNT = 10;
String connectFrame = "STOMP\n" +
"login: system\n" +
"passcode: manager\n" +
"accept-version:1.1\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
for(int i = 0; i < MSG_COUNT; ++i) {
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
"receipt:0\n" +
"\n" + "Hello World {" + i + "}" + Stomp.NULL;
stompConnection.sendFrame(message);
StompFrame repsonse = stompConnection.receive();
assertEquals("0", repsonse.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID));
}
String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" + "browser:true\n\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
for(int i = 0; i < MSG_COUNT; ++i) {
StompFrame message = stompConnection.receive();
assertEquals(Stomp.Responses.MESSAGE, message.getAction());
assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
}
// We should now get a browse done message
StompFrame browseDone = stompConnection.receive();
LOG.debug("Browse Done: " + browseDone.toString());
assertEquals(Stomp.Responses.MESSAGE, browseDone.getAction());
assertEquals("12345", browseDone.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
assertEquals("end", browseDone.getHeaders().get(Stomp.Headers.Message.BROWSER));
assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null);
String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(unsub);
Thread.sleep(2000);
subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
for(int i = 0; i < MSG_COUNT; ++i) {
StompFrame message = stompConnection.receive();
assertEquals(Stomp.Responses.MESSAGE, message.getAction());
assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
}
stompConnection.sendFrame(unsub);
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
}

View File

@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
@ -36,6 +37,7 @@ import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerFactory;

View File

@ -21,9 +21,9 @@ import java.security.cert.X509Certificate;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.transport.stomp.LegacyFrameTranslator;
import org.apache.activemq.transport.stomp.ProtocolConverter;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.transport.stomp.StompInactivityMonitor;
import org.apache.activemq.transport.stomp.StompTransport;
import org.apache.activemq.transport.stomp.StompWireFormat;
import org.apache.activemq.util.ByteSequence;
@ -38,7 +38,7 @@ import org.eclipse.jetty.websocket.WebSocket;
*/
class StompSocket extends TransportSupport implements WebSocket, StompTransport {
Outbound outbound;
ProtocolConverter protocolConverter = new ProtocolConverter(this, new LegacyFrameTranslator(), null);
ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
StompWireFormat wireFormat = new StompWireFormat();
public void onConnect(Outbound outbound) {
@ -91,4 +91,14 @@ class StompSocket extends TransportSupport implements WebSocket, StompTransport
public void sendToStomp(StompFrame command) throws IOException {
outbound.sendMessage(WebSocket.SENTINEL_FRAME, command.format());
}
@Override
public StompInactivityMonitor getInactivityMonitor() {
return null;
}
@Override
public StompWireFormat getWireFormat() {
return this.wireFormat;
}
}