mirror of https://github.com/apache/activemq.git
resolve hang in purge if message count stats are off and log a warning.. tidy up broker service waitForStarted logic which caused AMQ2102 test to block.. still issue with master/slave sync with AMQ2102
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@936798 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
eb983f7c08
commit
956c3e0df2
|
@ -689,10 +689,10 @@ public class BrokerService implements Service {
|
|||
}
|
||||
|
||||
/**
|
||||
* A helper method to block the caller thread until the broker has been
|
||||
* started
|
||||
* A helper method to block the caller thread until the broker has fully started
|
||||
* @return boolean true if wait succeeded false if broker was not started or was stopped
|
||||
*/
|
||||
public void waitUntilStarted() {
|
||||
public boolean waitUntilStarted() {
|
||||
boolean waitSucceeded = false;
|
||||
while (isStarted() && !stopped.get() && !waitSucceeded) {
|
||||
try {
|
||||
|
@ -700,6 +700,7 @@ public class BrokerService implements Service {
|
|||
} catch (InterruptedException ignore) {
|
||||
}
|
||||
}
|
||||
return waitSucceeded;
|
||||
}
|
||||
|
||||
// Properties
|
||||
|
|
|
@ -940,8 +940,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
|
||||
} while (!pagedInMessages.isEmpty() || this.destinationStatistics.getMessages().getCount() > 0);
|
||||
// don't spin/hang if stats are out and there is nothing left in the store
|
||||
} while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
|
||||
if (this.destinationStatistics.getMessages().getCount() > 0) {
|
||||
LOG.warn(getActiveMQDestination().getQualifiedName() + " after purge complete, message count stats report: " + this.destinationStatistics.getMessages().getCount());
|
||||
}
|
||||
gc();
|
||||
this.destinationStatistics.getMessages().setCount(0);
|
||||
getMessages().clear();
|
||||
|
|
|
@ -186,7 +186,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
|
||||
return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort()
|
||||
: (localLocation != null ? localLocation : remoteLocation)) ;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -397,7 +397,7 @@ public class AMQ2102Test extends CombinationTestSupport implements UncaughtExcep
|
|||
}
|
||||
|
||||
private static void debug(String message) {
|
||||
LOG.debug(message);
|
||||
LOG.info(message);
|
||||
}
|
||||
|
||||
private static void info(String message) {
|
||||
|
@ -464,7 +464,6 @@ public class AMQ2102Test extends CombinationTestSupport implements UncaughtExcep
|
|||
}
|
||||
};
|
||||
t.start();
|
||||
master.waitUntilStarted();
|
||||
masterUrl = master.getTransportConnectors().get(0).getConnectUri().toString();
|
||||
|
||||
debug("masterUrl: " + masterUrl);
|
||||
|
@ -475,6 +474,7 @@ public class AMQ2102Test extends CombinationTestSupport implements UncaughtExcep
|
|||
slave.setMasterConnectorURI(masterUrl);
|
||||
slave.start();
|
||||
slave.waitUntilStarted();
|
||||
assertTrue("master started", master.waitUntilStarted());
|
||||
}
|
||||
|
||||
public void tearDown() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue