resolve https://issues.apache.org/activemq/browse/AMQ-2737 - have nio work with soWritetTimeout filter

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@945692 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-05-18 15:20:33 +00:00
parent 5405ec8c49
commit 21891008c5
9 changed files with 116 additions and 45 deletions

View File

@ -26,6 +26,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
import org.apache.activemq.transport.tcp.TimeStampStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -97,8 +98,8 @@ public class WriteTimeoutFilter extends TransportFilter {
}
protected TcpBufferedOutputStream getWriter() {
return next.narrow(TcpBufferedOutputStream.class);
protected TimeStampStream getWriter() {
return next.narrow(TimeStampStream.class);
}
protected Socket getSocket() {

View File

@ -23,13 +23,15 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import org.apache.activemq.transport.tcp.TimeStampStream;
/**
* An optimized buffered outputstream for Tcp
*
* @version $Revision: 1.1.1.1 $
*/
public class NIOOutputStream extends OutputStream {
public class NIOOutputStream extends OutputStream implements TimeStampStream {
private static final int BUFFER_SIZE = 8192;
@ -39,6 +41,7 @@ public class NIOOutputStream extends OutputStream {
private int count;
private boolean closed;
private volatile long writeTimestamp = -1;//concurrent reads of this value
/**
* Constructor
@ -149,6 +152,8 @@ public class NIOOutputStream extends OutputStream {
int remaining = data.remaining();
int lastRemaining = remaining - 1;
long delay = 1;
try {
writeTimestamp = System.currentTimeMillis();
while (remaining > 0) {
// We may need to do a little bit of sleeping to avoid a busy loop.
@ -174,6 +179,24 @@ public class NIOOutputStream extends OutputStream {
out.write(data);
remaining = data.remaining();
}
} finally {
writeTimestamp = -1;
}
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
*/
public boolean isWriting() {
return writeTimestamp > 0;
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
*/
public long getWriteTimestamp() {
return writeTimestamp;
}
}

View File

@ -83,8 +83,9 @@ public class NIOTransport extends TcpTransport {
currentBuffer = inputBuffer;
nextFrameSize = -1;
currentBuffer.limit(4);
this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024));
NIOOutputStream outPutStream = new NIOOutputStream(channel, 16 * 1024);
this.dataOut = new DataOutputStream(outPutStream);
this.buffOut = outPutStream;
}
private void serviceRead() {

View File

@ -82,18 +82,25 @@ public class StompConnection {
throw new IOException("socket closed.");
} else if (c == 0) {
c = is.read();
if (c != '\n') {
throw new IOException("Expecting stomp frame to terminate with \0\n");
if (c == '\n') {
// end of frame
return stringFromBuffer(inputBuffer);
} else {
inputBuffer.write(0);
inputBuffer.write(c);
}
byte[] ba = inputBuffer.toByteArray();
inputBuffer.reset();
return new String(ba, "UTF-8");
} else {
inputBuffer.write(c);
}
}
}
private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
byte[] ba = inputBuffer.toByteArray();
inputBuffer.reset();
return new String(ba, "UTF-8");
}
public Socket getStompSocket() {
return stompSocket;
}

View File

@ -83,7 +83,9 @@ public class StompNIOTransport extends TcpTransport {
});
inputBuffer = ByteBuffer.allocate(8 * 1024);
this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024));
NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
this.dataOut = new DataOutputStream(outPutStream);
this.buffOut = outPutStream;
}
private void serviceRead() {

View File

@ -27,7 +27,7 @@ import java.io.OutputStream;
* @version $Revision: 1.1.1.1 $
*/
public class TcpBufferedOutputStream extends FilterOutputStream {
public class TcpBufferedOutputStream extends FilterOutputStream implements TimeStampStream {
private static final int BUFFER_SIZE = 8192;
private byte[] buffer;
private int bufferlen;
@ -129,10 +129,16 @@ public class TcpBufferedOutputStream extends FilterOutputStream {
super.close();
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
*/
public boolean isWriting() {
return writeTimestamp > 0;
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
*/
public long getWriteTimestamp() {
return writeTimestamp;
}

View File

@ -67,7 +67,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected Socket socket;
protected DataOutputStream dataOut;
protected DataInputStream dataIn;
protected TcpBufferedOutputStream buffOut = null;
protected TimeStampStream buffOut = null;
/**
* The Traffic Class to be set on the socket.
*/
@ -576,8 +576,9 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
}
};
this.dataIn = new DataInputStream(buffIn);
buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
this.dataOut = new DataOutputStream(buffOut);
TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
this.dataOut = new DataOutputStream(outputStream);
this.buffOut = outputStream;
}
protected void closeStreams() throws IOException {
@ -604,7 +605,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
public <T> T narrow(Class<T> target) {
if (target == Socket.class) {
return target.cast(socket);
} else if ( target == TcpBufferedOutputStream.class) {
} else if ( target == TimeStampStream.class) {
return target.cast(buffOut);
}
return super.narrow(target);

View File

@ -47,6 +47,7 @@ public class JmsTestSupport extends CombinationTestSupport {
static final private AtomicLong TEST_COUNTER = new AtomicLong();
public String userName;
public String password;
public String messageTextPrefix = "";
protected ConnectionFactory factory;
protected ActiveMQConnection connection;
@ -96,7 +97,7 @@ public class JmsTestSupport extends CombinationTestSupport {
protected void sendMessages(Session session, Destination destination, int count) throws JMSException {
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < count; i++) {
producer.send(session.createTextMessage("" + i));
producer.send(session.createTextMessage(messageTextPrefix + i));
}
producer.close();
}

View File

@ -20,6 +20,7 @@ package org.apache.activemq.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
@ -38,10 +39,11 @@ public class SocketProxy {
private static final transient Log LOG = LogFactory.getLog(SocketProxy.class);
public static final int ACCEPT_TIMEOUT_MILLIS = 1000;
public static final int ACCEPT_TIMEOUT_MILLIS = 100;
private URI proxyUrl;
private URI target;
private Acceptor acceptor;
private ServerSocket serverSocket;
@ -49,6 +51,11 @@ public class SocketProxy {
private int listenPort = 0;
private int receiveBufferSize = -1;
public SocketProxy() throws Exception {
}
public SocketProxy(URI uri) throws Exception {
this(0, uri);
}
@ -59,12 +66,24 @@ public class SocketProxy {
open();
}
protected void open() throws Exception {
public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
}
public void setTarget(URI tcpBrokerUri) {
target = tcpBrokerUri;
}
public void open() throws Exception {
serverSocket = new ServerSocket();
if (receiveBufferSize > 0) {
serverSocket.setReceiveBufferSize(receiveBufferSize);
}
if (proxyUrl == null) {
serverSocket = new ServerSocket(listenPort);
serverSocket.bind(new InetSocketAddress(listenPort));
proxyUrl = urlFromSocket(target, serverSocket);
} else {
serverSocket = new ServerSocket(proxyUrl.getPort());
serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
}
acceptor = new Acceptor(serverSocket, target);
new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
@ -151,9 +170,13 @@ public class SocketProxy {
public Connection(Socket socket, URI target) throws Exception {
receiveSocket = socket;
sendSocket = new Socket(target.getHost(), target.getPort());
sendSocket = new Socket();
if (receiveBufferSize > 0) {
sendSocket.setReceiveBufferSize(receiveBufferSize);
}
sendSocket.connect(new InetSocketAddress(target.getHost(), target.getPort()));
linkWithThreads(receiveSocket, sendSocket);
LOG.info("proxy connection " + sendSocket);
LOG.info("proxy connection " + sendSocket + ", receiveBufferSize=" + sendSocket.getReceiveBufferSize());
}
public void goOn() {
@ -210,6 +233,7 @@ public class SocketProxy {
while (true) {
int len = in.read(buf);
if (len == -1) {
LOG.debug("read eof from:" + src);
break;
}
pause.get().await();
@ -259,7 +283,12 @@ public class SocketProxy {
pause.get().await();
try {
Socket source = socket.accept();
LOG.info("accepted " + source);
LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
pause.get().await();
if (receiveBufferSize > 0) {
source.setReceiveBufferSize(receiveBufferSize);
}
LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
synchronized(connections) {
connections.add(new Connection(source, target));
}