Robert Davies 2008-02-07 09:51:19 +00:00
parent cada61f3ca
commit 6c52d28985
2 changed files with 68 additions and 5 deletions

View File

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import java.io.IOException;
@ -74,6 +75,13 @@ public class ConnectionPool {
public void transportResumed() {
}
});
//
// make sure that we set the hasFailed flag, in case the transport already failed
// prior to the addition of our new TransportListener
//
if(connection.isTransportFailed()) {
hasFailed = true;
}
}
public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory,
@ -218,5 +226,4 @@ public class ConnectionPool {
}
}
}
}

View File

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.failover;
import java.io.IOException;
@ -29,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.state.ConnectionStateTracker;
import org.apache.activemq.state.Tracked;
import org.apache.activemq.thread.DefaultThreadPools;
@ -176,7 +178,12 @@ public class FailoverTransport implements CompositeTransport {
transportListener.transportInterupted();
}
synchronized (reconnectMutex) {
boolean reconnectOk = false;
if(started) {
LOG.warn("Transport failed, attempting to automatically reconnect due to: " + e, e);
reconnectOk = true;
}
if (connectedTransport != null) {
initialized = false;
ServiceSupport.dispose(connectedTransport);
@ -185,9 +192,12 @@ public class FailoverTransport implements CompositeTransport {
connectedTransportURI = null;
connected=false;
}
if(reconnectOk) {
reconnectTask.wakeup();
}
}
}
public void start() throws Exception {
synchronized (reconnectMutex) {
@ -308,12 +318,41 @@ public class FailoverTransport implements CompositeTransport {
this.backupPoolSize = backupPoolSize;
}
/*
* BEGIN Patch segment for issue AMQ-1116
*/
/**
* @return Returns true if the command is one sent when a connection
* is being closed.
*/
private boolean isShutdownCommand(Command command) {
return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
}
/*
* END Patch segment
*/
public void oneway(Object o) throws IOException {
Command command = (Command)o;
Exception error = null;
try {
synchronized (reconnectMutex) {
if (isShutdownCommand(command) && connectedTransport == null) {
if(command.isShutdownInfo()) {
// Skipping send of ShutdownInfo command when not connected.
return;
}
if(command instanceof RemoveInfo) {
// Simulate response to RemoveInfo command
Response response = new Response();
response.setCorrelationId(command.getCommandId());
myTransportListener.onCommand(response);
return;
}
}
// Keep trying until the message is sent.
for (int i = 0; !disposed; i++) {
try {
@ -609,11 +648,28 @@ public class FailoverTransport implements CompositeTransport {
if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been initialized
// for this instance.
while(transportListener == null) {
try {
Thread.sleep(100);
}
catch(InterruptedException iEx) {}
}
if(transportListener != null) {
if (connectionFailure instanceof IOException) {
transportListener.onException((IOException)connectionFailure);
} else {
transportListener.onException(IOExceptionSupport.create(connectionFailure));
}
}
reconnectMutex.notifyAll();
return false;
}
}
if (!disposed) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");