apply patch for AMQ-1993, thanks

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@710109 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-11-03 17:30:59 +00:00
parent 0d2172bc9b
commit f561d6bb67
4 changed files with 183 additions and 5 deletions

View File

@ -26,8 +26,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.SslContext;
@ -44,6 +42,7 @@ public abstract class TransportFactory {
private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
private static final String THREAD_NAME_FILTER = "threadName";
public abstract TransportServer doBind(URI location) throws IOException;
@ -265,6 +264,11 @@ public abstract class TransportFactory {
* @throws Exception
*/
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
transport = new WriteTimeoutFilter(transport);
String soWriteTimeout = (String)options.get(WRITE_TIMEOUT_FILTER);
if (soWriteTimeout!=null) ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
}
if (options.containsKey(THREAD_NAME_FILTER)) {
transport = new ThreadNameFilter(transport);
}

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
import org.apache.activemq.transport.tcp.TcpTransport;
/**
* This filter implements write timeouts for socket write operations.
* When using blocking IO, the Java implementation doesn't have an explicit flag
* to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions,
* which is usually around 13-30 minutes).<br/>
* To enable this transport, in the transport URI, simpley add<br/>
* <code>transport.soWriteTimeout=<value in millis></code>.<br/>
* For example (15 second timeout on write operations to the socket):</br>
* <pre><code>
* &lt;transportConnector
* name=&quot;tcp1&quot;
* uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
* /&gt;
* </code></pre><br/>
* For example (enable default timeout on the socket):</br>
* <pre><code>
* &lt;transportConnector
* name=&quot;tcp1&quot;
* uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
* /&gt;
* </code></pre>
* @author Filip Hanik
*
*/
public class WriteTimeoutFilter extends TransportFilter {
protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
protected static AtomicInteger messageCounter = new AtomicInteger(0);
protected static TimeoutThread timeoutThread = new TimeoutThread();
protected long writeTimeout = -1;
public WriteTimeoutFilter(Transport next) {
super(next);
}
@Override
public void oneway(Object command) throws IOException {
try {
registerWrite(this);
super.oneway(command);
} catch (IOException x) {
deRegisterWrite(this,true,x);
throw x;
} finally {
deRegisterWrite(this,false,null);
}
}
public long getWriteTimeout() {
return writeTimeout;
}
public void setWriteTimeout(long writeTimeout) {
this.writeTimeout = writeTimeout;
}
protected TcpBufferedOutputStream getWriter() {
return next.narrow(TcpBufferedOutputStream.class);
}
protected static void registerWrite(WriteTimeoutFilter filter) {
writers.add(filter);
}
protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
boolean result = writers.remove(filter);
if (result) {
if (fail) {
IOException ex = (iox!=null)?iox:new IOException("Forced write timeout for:"+filter.getNext().getRemoteAddress());
filter.getTransportListener().onException(ex);
}
}
return result;
}
@Override
public void start() throws Exception {
super.start();
}
@Override
public void stop() throws Exception {
super.stop();
}
protected static class TimeoutThread extends Thread {
static AtomicInteger instance = new AtomicInteger(0);
boolean run = true;
public TimeoutThread() {
setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet());
setDaemon(true);
setPriority(Thread.MIN_PRIORITY);
start();
}
public void run() {
while (run) {
if (!interrupted()) {
Iterator<WriteTimeoutFilter> filters = writers.iterator();
while (run && filters.hasNext()) {
WriteTimeoutFilter filter = filters.next();
if (filter.getWriteTimeout()<=0) continue; //no timeout set
long writeStart = filter.getWriter().getWriteTimestamp();
long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
if (delta>filter.getWriteTimeout()) {
WriteTimeoutFilter.deRegisterWrite(filter, true,null);
}//if timeout
}//while
}//if interrupted
try {
Thread.sleep(5000);
} catch (InterruptedException x) {
//do nothing
}
}
}
}
}

View File

@ -32,6 +32,8 @@ public class TcpBufferedOutputStream extends FilterOutputStream {
private byte[] buffer;
private int bufferlen;
private int count;
private volatile long writeTimestamp = -1;//concurrent reads of this value
/**
* Constructor
@ -89,7 +91,12 @@ public class TcpBufferedOutputStream extends FilterOutputStream {
System.arraycopy(b, off, buffer, count, len);
count += len;
} else {
out.write(b, off, len);
try {
writeTimestamp = System.currentTimeMillis();
out.write(b, off, len);
} finally {
writeTimestamp = System.currentTimeMillis();
}
}
}
}
@ -103,7 +110,12 @@ public class TcpBufferedOutputStream extends FilterOutputStream {
*/
public void flush() throws IOException {
if (count > 0 && out != null) {
out.write(buffer, 0, count);
try {
writeTimestamp = System.currentTimeMillis();
out.write(buffer, 0, count);
} finally {
writeTimestamp = -1;
}
count = 0;
}
}
@ -117,4 +129,12 @@ public class TcpBufferedOutputStream extends FilterOutputStream {
super.close();
}
public boolean isWriting() {
return writeTimestamp > 0;
}
public long getWriteTimestamp() {
return writeTimestamp;
}
}

View File

@ -69,6 +69,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected Socket socket;
protected DataOutputStream dataOut;
protected DataInputStream dataIn;
protected TcpBufferedOutputStream buffOut = null;
/**
* trace=true -> the Transport stack where this TcpTransport
* object will be, will have a TransportLogger layer
@ -505,7 +506,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected void initializeStreams() throws Exception {
TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
this.dataIn = new DataInputStream(buffIn);
TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
this.dataOut = new DataOutputStream(buffOut);
}
@ -533,9 +534,12 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
public <T> T narrow(Class<T> target) {
if (target == Socket.class) {
return target.cast(socket);
} else if ( target == TcpBufferedOutputStream.class) {
return target.cast(buffOut);
}
return super.narrow(target);
}
static {
SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {