Added request(Command. timeout) to transport and added a timeout on

close from ActiveMQConnection

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384853 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-03-10 17:03:31 +00:00
parent f51ac13f85
commit 5fe0a4ca8a
12 changed files with 147 additions and 2 deletions

View File

@ -116,6 +116,7 @@ public class ActiveMQConnection extends DefaultTransportListener implements Conn
protected boolean asyncDispatch = true; protected boolean asyncDispatch = true;
private boolean useAsyncSend = false; private boolean useAsyncSend = false;
private boolean useRetroactiveConsumer; private boolean useRetroactiveConsumer;
private int closeTimeout = 15000;
private long flowControlSleepTime = 0; private long flowControlSleepTime = 0;
private final JMSConnectionStatsImpl stats; private final JMSConnectionStatsImpl stats;
@ -541,7 +542,7 @@ public class ActiveMQConnection extends DefaultTransportListener implements Conn
if (isConnectionInfoSentToBroker) { if (isConnectionInfoSentToBroker) {
syncSendPacket(info.createRemoveCommand()); syncSendPacket(info.createRemoveCommand(),closeTimeout);
} }
asyncSendPacket(new ShutdownInfo()); asyncSendPacket(new ShutdownInfo());
@ -734,6 +735,22 @@ public class ActiveMQConnection extends DefaultTransportListener implements Conn
this.optimizedMessageDispatch = dispatchOptimizedMessage; this.optimizedMessageDispatch = dispatchOptimizedMessage;
} }
/**
* @return Returns the closeTimeout.
*/
public int getCloseTimeout(){
return closeTimeout;
}
/**
* @param closeTimeout The closeTimeout to set.
*/
public void setCloseTimeout(int closeTimeout){
this.closeTimeout=closeTimeout;
}
/** /**
* *
* @return Returns the onSendPrepareMessageBody. * @return Returns the onSendPrepareMessageBody.
@ -1077,6 +1094,41 @@ public class ActiveMQConnection extends DefaultTransportListener implements Conn
} }
} }
} }
/**
* Send a packet through a Connection - for internal use only
*
* @param command
* @return
* @throws JMSException
*/
public Response syncSendPacket(Command command, int timeout) throws JMSException {
if (isClosed()) {
throw new ConnectionClosedException();
} else {
if (command.isMessage() && flowControlSleepTime > 0) {
try {
Thread.sleep(flowControlSleepTime);
} catch (InterruptedException e) {
}
}
try {
Response response = this.transport.request(command,timeout);
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
if (er.getException() instanceof JMSException)
throw (JMSException) er.getException();
else
throw JMSExceptionSupport.create(er.getException());
}
return response;
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
}
public boolean isClosed() { public boolean isClosed() {
return closed.get(); return closed.get();

View File

@ -81,6 +81,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private boolean objectMessageSerializationDefered = false; private boolean objectMessageSerializationDefered = false;
protected boolean asyncDispatch = true; protected boolean asyncDispatch = true;
private boolean useAsyncSend = false; private boolean useAsyncSend = false;
private int closeTimeout = 15000;
private boolean useRetroactiveConsumer; private boolean useRetroactiveConsumer;
JMSStatsImpl factoryStats = new JMSStatsImpl(); JMSStatsImpl factoryStats = new JMSStatsImpl();
@ -415,6 +416,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
props.setProperty("useCompression", Boolean.toString(isUseCompression())); props.setProperty("useCompression", Boolean.toString(isUseCompression()));
props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
props.setProperty("userName", getUserName()); props.setProperty("userName", getUserName());
props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
} }
public boolean isOnSendPrepareMessageBody() { public boolean isOnSendPrepareMessageBody() {
@ -448,4 +450,18 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
public void setAsyncDispatch(boolean asyncDispatch) { public void setAsyncDispatch(boolean asyncDispatch) {
this.asyncDispatch = asyncDispatch; this.asyncDispatch = asyncDispatch;
} }
/**
* @return Returns the closeTimeout.
*/
public int getCloseTimeout(){
return closeTimeout;
}
/**
* @param closeTimeout The closeTimeout to set.
*/
public void setCloseTimeout(int closeTimeout){
this.closeTimeout=closeTimeout;
}
} }

View File

@ -19,6 +19,8 @@ package org.apache.activemq.transport;
import edu.emory.mathcs.backport.java.util.concurrent.Callable; import edu.emory.mathcs.backport.java.util.concurrent.Callable;
import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException; import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
import edu.emory.mathcs.backport.java.util.concurrent.FutureTask; import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
@ -52,6 +54,23 @@ public class FutureResponse extends FutureTask {
} }
} }
public synchronized Response getResult(int timeout) throws IOException {
try {
return (Response) super.get(timeout,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted.");
} catch (ExecutionException e) {
Throwable target = e.getCause();
if( target instanceof IOException ) {
throw (IOException)target;
} else {
throw IOExceptionSupport.create(target);
}
}catch(TimeoutException e){
return null;
}
}
public synchronized void set(Object result) { public synchronized void set(Object result) {
super.set(result); super.set(result);
} }

View File

@ -51,6 +51,12 @@ public class MutexTransport extends TransportFilter {
} }
} }
public Response request(Command command,int timeout) throws IOException {
synchronized(writeMutex){
return next.request(command,timeout);
}
}
public String toString() { public String toString() {
return next.toString(); return next.toString();
} }

View File

@ -67,6 +67,11 @@ final public class ResponseCorrelator extends TransportFilter {
return response.getResult(); return response.getResult();
} }
public Response request(Command command,int timeout) throws IOException {
FutureResponse response = asyncRequest(command);
return response.getResult(timeout);
}
public void onCommand(Command command) { public void onCommand(Command command) {
boolean debug = log.isDebugEnabled(); boolean debug = log.isDebugEnabled();
if( command.isResponse() ) { if( command.isResponse() ) {

View File

@ -32,30 +32,53 @@ public interface Transport extends Service {
/** /**
* A one way asynchronous send * A one way asynchronous send
* @param command
* @throws IOException
*/ */
public void oneway(Command command) throws IOException; public void oneway(Command command) throws IOException;
/** /**
* An asynchronous request response where the Receipt will be returned * An asynchronous request response where the Receipt will be returned
* in the future * in the future
* @param command
* @return the FutureResponse
* @throws IOException
*/ */
public FutureResponse asyncRequest(Command command) throws IOException; public FutureResponse asyncRequest(Command command) throws IOException;
/** /**
* A synchronous request response * A synchronous request response
* @param command
* @return the response
* @throws IOException
*/ */
public Response request(Command command) throws IOException; public Response request(Command command) throws IOException;
/**
* A synchronous request response
* @param command
* @param timeout
* @return the repsonse or null if timeout
* @throws IOException
*/
public Response request(Command command, int timeout) throws IOException;
/** /**
* Returns the current transport listener * Returns the current transport listener
* @return
*/ */
public TransportListener getTransportListener(); public TransportListener getTransportListener();
/** /**
* Registers an inbound command listener * Registers an inbound command listener
* @param commandListener
*/ */
public void setTransportListener(TransportListener commandListener); public void setTransportListener(TransportListener commandListener);
/**
* @param target
* @return the target
*/
public Object narrow(Class target); public Object narrow(Class target);
} }

View File

@ -93,6 +93,10 @@ public class TransportFilter extends DefaultTransportListener implements Transpo
public Response request(Command command) throws IOException { public Response request(Command command) throws IOException {
return next.request(command); return next.request(command);
} }
public Response request(Command command,int timeout) throws IOException {
return next.request(command,timeout);
}
public void onException(IOException error) { public void onException(IOException error) {
transportListener.onException(error); transportListener.onException(error);

View File

@ -73,6 +73,10 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo
public Response request(Command command) throws IOException { public Response request(Command command) throws IOException {
throw new AssertionError("Unsupported Method"); throw new AssertionError("Unsupported Method");
} }
public Response request(Command command,int timeout) throws IOException {
throw new AssertionError("Unsupported Method");
}
/** /**
* Process the inbound command * Process the inbound command

View File

@ -389,6 +389,10 @@ public class FailoverTransport implements CompositeTransport {
public Response request(Command command) throws IOException { public Response request(Command command) throws IOException {
throw new AssertionError("Unsupported Method"); throw new AssertionError("Unsupported Method");
} }
public Response request(Command command,int timeout) throws IOException {
throw new AssertionError("Unsupported Method");
}
public void add(URI u[]) { public void add(URI u[]) {
for (int i = 0; i < u.length; i++) { for (int i = 0; i < u.length; i++) {

View File

@ -418,6 +418,10 @@ public class FanoutTransport implements CompositeTransport {
public Response request(Command command) throws IOException { public Response request(Command command) throws IOException {
throw new AssertionError("Unsupported Method"); throw new AssertionError("Unsupported Method");
} }
public Response request(Command command,int timeout) throws IOException {
throw new AssertionError("Unsupported Method");
}
public void reconnect() { public void reconnect() {
log.debug("Waking up reconnect task"); log.debug("Waking up reconnect task");

View File

@ -102,6 +102,10 @@ public class MockTransport extends DefaultTransportListener implements Transport
synchronized public Response request(Command command) throws IOException { synchronized public Response request(Command command) throws IOException {
return next.request(command); return next.request(command);
} }
public Response request(Command command,int timeout) throws IOException {
return next.request(command, timeout);
}
synchronized public void onException(IOException error) { synchronized public void onException(IOException error) {
transportListener.onException(error); transportListener.onException(error);

View File

@ -86,6 +86,10 @@ public class VMTransport implements Transport{
public Response request(Command command) throws IOException{ public Response request(Command command) throws IOException{
throw new AssertionError("Unsupported Method"); throw new AssertionError("Unsupported Method");
} }
public Response request(Command command,int timeout) throws IOException {
throw new AssertionError("Unsupported Method");
}
public synchronized TransportListener getTransportListener() { public synchronized TransportListener getTransportListener() {
return transportListener; return transportListener;