This commit is contained in:
Clebert Suconic 2017-06-27 12:19:38 -04:00
commit 4d6956c291
26 changed files with 77 additions and 65 deletions

View File

@ -122,7 +122,7 @@ public class Run extends LockAbstract {
if (file.exists()) {
try {
try {
server.exit();
server.stop(true);
} catch (Exception e) {
e.printStackTrace();
}
@ -143,7 +143,7 @@ public class Run extends LockAbstract {
@Override
public void run() {
try {
server.exit();
server.stop(true);
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -124,23 +124,19 @@ public class FileBroker implements Broker {
@Override
public void stop() throws Exception {
stop(false);
}
@Override
public void exit() throws Exception {
stop(true);
}
private void stop(boolean isShutdown) throws Exception {
@Override
public void stop(boolean isShutdown) throws Exception {
if (!started) {
return;
}
ActiveMQComponent[] mqComponents = new ActiveMQComponent[components.size()];
components.values().toArray(mqComponents);
for (int i = mqComponents.length - 1; i >= 0; i--) {
if (mqComponents[i] instanceof ServiceComponent && isShutdown) {
((ServiceComponent) mqComponents[i]).exit();
if (mqComponents[i] instanceof ServiceComponent) {
((ServiceComponent) mqComponents[i]).stop(isShutdown);
} else {
mqComponents[i].stop();
}

View File

@ -22,5 +22,5 @@ package org.apache.activemq.artemis.core.server;
public interface ServiceComponent extends ActiveMQComponent {
//called by shutdown hooks before exit the VM
void exit() throws Exception;
void stop(boolean shutdown) throws Exception;
}

View File

@ -2216,7 +2216,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
server.stop(true);
server.fail(true);
}
@Override
@ -2251,7 +2251,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, connector);
}
server.stop(true);
server.fail(true);
}
}

View File

@ -381,7 +381,7 @@ public interface ActiveMQServer extends ServiceComponent {
*/
boolean isAddressBound(String address) throws Exception;
void stop(boolean failoverOnServerShutdown) throws Exception;
void fail(boolean failoverOnServerShutdown) throws Exception;
Queue updateQueue(String name,
RoutingType routingType,

View File

@ -340,7 +340,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public void stop() throws Exception {
internalStop(false);
ActiveMQServerImpl.this.stop(false);
}
@Override
@ -679,20 +679,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public void exit() throws Exception {
internalStop(true);
public void stop() throws Exception {
stop(true);
}
@Override
public final void stop() throws Exception {
internalStop(false);
}
private void internalStop(boolean isExit) throws Exception {
public void stop(boolean isShutdown) throws Exception {
try {
stop(false, isExit);
stop(false, isShutdown);
} finally {
networkHealthCheck.stop();
if (isShutdown) networkHealthCheck.stop();
}
}
@ -832,7 +828,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public final void stop(boolean failoverOnServerShutdown) throws Exception {
public final void fail(boolean failoverOnServerShutdown) throws Exception {
stop(failoverOnServerShutdown, false, false, false);
}
@ -866,7 +862,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
*
* @param criticalIOError whether we have encountered an IO error with the journal etc
*/
void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting, boolean isExit) {
void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting, boolean isShutdown) {
synchronized (this) {
if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) {
@ -1055,8 +1051,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (ActiveMQComponent externalComponent : externalComponents) {
try {
if (isExit && externalComponent instanceof ServiceComponent) {
((ServiceComponent)externalComponent).exit();
if (externalComponent instanceof ServiceComponent) {
((ServiceComponent)externalComponent).stop(isShutdown);
} else {
externalComponent.stop();
}

View File

@ -180,7 +180,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
clusterConnection.addClusterTopologyListener(listener1);
if (listener1.waitForBackup()) {
//if we have to many backups kept or are not configured to restart just stop, otherwise restart as a backup
activeMQServer.stop(true);
activeMQServer.fail(true);
ActiveMQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback();
// activeMQServer.moveServerData(replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize());
activeMQServer.setHAPolicy(replicatedPolicy.getReplicaPolicy());

View File

@ -81,7 +81,7 @@ public class EmbeddedServerTest {
server.addExternalComponent(normalComponent);
server.addExternalComponent(serviceComponent);
server.stop();
server.stop(false);
assertTrue(normalComponent.stopCalled);
assertTrue(serviceComponent.stopCalled);
@ -91,7 +91,7 @@ public class EmbeddedServerTest {
serviceComponent.resetFlags();
server.start();
server.exit();
server.stop();
assertTrue(normalComponent.stopCalled);
assertFalse(serviceComponent.stopCalled);
@ -129,8 +129,12 @@ public class EmbeddedServerTest {
volatile boolean exitCalled;
@Override
public void exit() throws Exception {
exitCalled = true;
public void stop(boolean isShutdown) throws Exception {
if (isShutdown) {
exitCalled = true;
} else {
stop();
}
}
@Override

View File

@ -2358,7 +2358,7 @@ public abstract class ActiveMQTestBase extends Assert {
clusterManager.flushExecutor();
clusterManager.clear();
Assert.assertTrue("server should be running!", server.isStarted());
server.stop(true);
server.fail(true);
if (sessions.length > 0) {
// Wait to be informed of failure

View File

@ -191,10 +191,6 @@ public class WebServerComponent implements ExternalComponent {
}
@Override
public void exit() throws Exception {
stop(true);
}
public void stop(boolean isShutdown) throws Exception {
if (isShutdown) {
internalStop();

View File

@ -71,7 +71,7 @@ public class BMFailoverTest extends FailoverTestBase {
public static void stopAndThrow() throws ActiveMQUnBlockedException {
if (!stopped) {
try {
serverToStop.getServer().stop(true);
serverToStop.getServer().fail(true);
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -233,7 +233,7 @@ public class LargeMessageOverReplicationTest extends ActiveMQTestBase {
try {
if (messageChunkCount == 10) {
liveServer.stop(true);
liveServer.fail(true);
System.err.println("activating");
if (!backupServer.waitForActivation(1, TimeUnit.MINUTES)) {

View File

@ -221,7 +221,7 @@ public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase
waitForRemoteBackup(connection.getSessionFactory(), 30);
liveServer.stop(true);
liveServer.fail(true);
Assert.assertTrue(failedOver.await(10, TimeUnit.SECONDS));

View File

@ -200,7 +200,7 @@ public class RaceOnSyncLargeMessageOverReplicationTest extends ActiveMQTestBase
Assert.assertFalse(t.isAlive());
liveServer.stop(true);
liveServer.fail(true);
Assert.assertTrue(failedOver.await(10, TimeUnit.SECONDS));

View File

@ -92,7 +92,7 @@ public class StartStopDeadlockTest extends ActiveMQTestBase {
align.countDown();
startLatch.await();
System.out.println("Crashing....");
serverLive.stop(true);
serverLive.fail(true);
} catch (Exception e) {
errors.incrementAndGet();
e.printStackTrace();

View File

@ -129,7 +129,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
// ((ServerSessionImpl) srvSession).clearLargeMessage();
// }
server.stop(false);
server.fail(false);
ActiveMQTestBase.forceGC();
@ -307,7 +307,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
session.rollback();
}
server.stop(false);
server.fail(false);
server.start();
server.stop();
@ -385,7 +385,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
sf.close();
}
server.stop(false);
server.fail(false);
server.start();
validateNoFilesOnLargeDir();
@ -443,7 +443,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
session.close();
sf.close();
server.stop(false);
server.fail(false);
server.start();
for (int start = 0; start < 2; start++) {
@ -635,7 +635,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
}
((ActiveMQServerImpl) server).replaceQueueFactory(original);
server.stop(false);
server.fail(false);
server.start();
server.stop();
@ -698,7 +698,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
}
((ActiveMQServerImpl) server).replaceQueueFactory(original);
server.stop(false);
server.fail(false);
server.start();
server.stop();

View File

@ -155,7 +155,7 @@ public class BridgeReconnectTest extends BridgeTestBase {
startServers();
waitForServerStart(server0);
server0.stop(true);
server0.fail(true);
waitForServerStart(server2);
@ -206,7 +206,7 @@ public class BridgeReconnectTest extends BridgeTestBase {
BridgeReconnectTest.log.info("** failing connection");
// Now we will simulate a failure of the bridge connection between server0 and server1
server0.stop(true);
server0.fail(true);
waitForServerStart(server2);
@ -285,7 +285,7 @@ public class BridgeReconnectTest extends BridgeTestBase {
startServers();
// Now we will simulate a failure of the bridge connection between server0 and server1
server0.stop(true);
server0.fail(true);
locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(server2tc)).setReconnectAttempts(100);
ClientSessionFactory csf0 = addSessionFactory(locator.createSessionFactory(server2tc));

View File

@ -1455,7 +1455,7 @@ public class BridgeTest extends ActiveMQTestBase {
try {
System.out.println("Stopping server");
latch.countDown();
serverToStop.stop(false);
serverToStop.fail(false);
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -93,7 +93,7 @@ public abstract class GroupingFailoverTestBase extends ClusterTestBase {
closeSessionFactory(0);
servers[0].stop(true);
servers[0].fail(true);
waitForServerRestart(2);
@ -194,7 +194,7 @@ public abstract class GroupingFailoverTestBase extends ClusterTestBase {
closeSessionFactory(0);
servers[0].stop(true);
servers[0].fail(true);
waitForServerRestart(2);

View File

@ -26,14 +26,18 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class NetworkIsolationReplicationTest extends FailoverTestBase {
public class NetworkReplicationTest extends FailoverTestBase {
private static final Logger logger = Logger.getLogger(NetworkIsolationReplicationTest.class);
private static final Logger logger = Logger.getLogger(NetworkReplicationTest.class);
// This address is guaranteed to fail... reserved for documentation https://tools.ietf.org/html/rfc5737
private static final String badAddress = "203.0.113.1";
@Before
@Override
@ -59,6 +63,22 @@ public class NetworkIsolationReplicationTest extends FailoverTestBase {
return addClientSession(sf1.createSession(xa, autoCommitSends, autoCommitAcks));
}
@Test
public void testReactivate() throws Exception {
liveServer.getServer().getConfiguration().setNetworkCheckPeriod(100).setNetworkCheckTimeout(200);
liveServer.start();
Assert.assertTrue(Wait.waitFor(liveServer::isActive));
liveServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName(badAddress));
Assert.assertTrue(Wait.waitFor(() -> !liveServer.isStarted()));
liveServer.getServer().getNetworkHealthCheck().clearAddresses();
Assert.assertTrue(Wait.waitFor(liveServer::isStarted));
}
@Test
public void testDoNotActivateOnIsolation() throws Exception {
AssertionLoggerHandler.startCapture();
@ -81,7 +101,7 @@ public class NetworkIsolationReplicationTest extends FailoverTestBase {
backupServer.getServer().getNetworkHealthCheck().clearAddresses();
}
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1"));
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName(badAddress));
backupServer.getServer().start();
ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory());
@ -121,7 +141,7 @@ public class NetworkIsolationReplicationTest extends FailoverTestBase {
backupServer.stop();
liveServer.stop();
liveServer.getServer().getConfiguration().setNetworkCheckList("203.0.113.1").
liveServer.getServer().getConfiguration().setNetworkCheckList(badAddress).
setNetworkCheckPeriod(100).setNetworkCheckTimeout(100);
try {

View File

@ -92,7 +92,7 @@ public class SameProcessActiveMQServer implements TestableServer {
clusterManager.flushExecutor();
clusterManager.clear();
Assert.assertTrue("server should be running!", server.isStarted());
server.stop(failover);
server.fail(failover);
if (waitFailure) {
// Wait to be informed of failure

View File

@ -220,7 +220,7 @@ public class ReplicationWithDivertTest extends ActiveMQTestBase {
}
Assert.assertFalse(t.isAlive());
liveServer.stop(true);
liveServer.fail(true);
Assert.assertTrue(failedOver.await(10, TimeUnit.SECONDS));
{

View File

@ -257,7 +257,7 @@ public class JMSUtil {
ClusterManager clusterManager = server.getClusterManager();
clusterManager.clear();
server.stop(true);
server.fail(true);
// Wait to be informed of failure
boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);

View File

@ -1886,7 +1886,7 @@ public class MQTTTest extends MQTTTestSupport {
connection1.connect();
connection1.publish(address, payload.getBytes(), QoS.AT_LEAST_ONCE, true);
getServer().stop(false);
getServer().fail(false);
getServer().start();
waitForServerToStart(getServer());

View File

@ -73,7 +73,7 @@ public class PagingWithFailoverAndCountersTest extends ActiveMQTestBase {
if (inProcessBackup != null) {
try {
inProcessBackup.getServer().stop(false);
inProcessBackup.getServer().fail(false);
} catch (Throwable ignored) {
ignored.printStackTrace();
}

View File

@ -47,7 +47,7 @@ public class SimpleStartStopTest extends ActiveMQTestBase {
for (int i = 0; i < 50; i++) {
server = createServer(true, false);
server.start();
server.stop(false);
server.fail(false);
}
// There shouldn't be any error from starting / stopping the server