Add option connectResponseTimeout to allow a stuck connection in
ensureConnectionInfoSent from stalling out a client.  Timeout is
disabled by default.
This commit is contained in:
Timothy Bish 2016-07-18 11:30:38 -04:00
parent a3a5a1affa
commit f43c090809
3 changed files with 45 additions and 2 deletions

View File

@ -210,6 +210,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private List<String> trustedPackages = new ArrayList<String>();
private boolean trustAllPackages = false;
private int connectResponseTimeout;
/**
* Construct an <code>ActiveMQConnection</code>
@ -1496,7 +1497,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
info.setClientId(clientIdGenerator.generateId());
}
syncSendPacket(info.copy());
syncSendPacket(info.copy(), getConnectResponseTimeout());
this.isConnectionInfoSentToBroker = true;
// Add a temp destination advisory consumer so that
@ -2605,4 +2606,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void setTrustAllPackages(boolean trustAllPackages) {
this.trustAllPackages = trustAllPackages;
}
public int getConnectResponseTimeout() {
return connectResponseTimeout;
}
public void setConnectResponseTimeout(int connectResponseTimeout) {
this.connectResponseTimeout = connectResponseTimeout;
}
}

View File

@ -160,6 +160,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
private long warnAboutUnstartedConnectionTimeout = 500L;
private int sendTimeout = 0;
private int connectResponseTimeout = 0;
private boolean sendAcksAsync=true;
private TransportListener transportListener;
private ExceptionListener exceptionListener;
@ -421,6 +422,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled());
connection.setTrustedPackages(getTrustedPackages());
connection.setTrustAllPackages(isTrustAllPackages());
connection.setConnectResponseTimeout(getConnectResponseTimeout());
if (transportListener != null) {
connection.addTransportListener(transportListener);
}
@ -438,7 +440,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
//
// /////////////////////////////////////////////
public String getBrokerURL() {
public String getBrokerURL() {
return brokerURL == null ? null : brokerURL.toString();
}
@ -831,6 +833,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
props.setProperty("connectResponseTimeout", Integer.toString(getConnectResponseTimeout()));
props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
@ -1275,4 +1278,12 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
public void setTrustAllPackages(boolean trustAllPackages) {
this.trustAllPackages = trustAllPackages;
}
public int getConnectResponseTimeout() {
return connectResponseTimeout;
}
public void setConnectResponseTimeout(int connectResponseTimeout) {
this.connectResponseTimeout = connectResponseTimeout;
}
}

View File

@ -98,6 +98,29 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
assertEquals(5000, cf.getAuditDepth());
}
public void testConnectAttemptTimeotOptionIsApplied() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
assertEquals(0, cf.getConnectResponseTimeout());
// the broker url have been adjusted.
assertEquals("vm://localhost", cf.getBrokerURL());
ActiveMQConnection connection = (ActiveMQConnection)cf.createConnection();
assertEquals(0, connection.getConnectResponseTimeout());
connection.close();
cf = new ActiveMQConnectionFactory("vm://localhost?jms.connectResponseTimeout=1000");
assertEquals(1000, cf.getConnectResponseTimeout());
// the broker url have been adjusted.
assertEquals("vm://localhost", cf.getBrokerURL());
connection = (ActiveMQConnection)cf.createConnection();
assertEquals(1000, connection.getConnectResponseTimeout());
connection.close();
}
public void testUseURIToConfigureRedeliveryPolicy() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
"vm://localhost?broker.persistent=false&broker.useJmx=false&jms.redeliveryPolicy.maximumRedeliveries=2");