ARTEMIS-2046 Fixing issues with JournalStorageManager.stop in replication, JDBC and shared storage

This commit is contained in:
Clebert Suconic 2018-08-17 16:45:07 -04:00
parent f1dfc7281b
commit 63e6cd98f8
11 changed files with 409 additions and 41 deletions

View File

@ -249,6 +249,12 @@ public enum ActiveMQExceptionType {
public ActiveMQException createException(String msg) {
return new ActiveMQNullRefException(msg);
}
},
SHUTDOWN_ERROR(219) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQShutdownException(msg);
}
};
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
/**
* An operation failed because an address exists on the server.
*/
public final class ActiveMQShutdownException extends ActiveMQException {
public ActiveMQShutdownException() {
super(ActiveMQExceptionType.SHUTDOWN_ERROR);
}
public ActiveMQShutdownException(String msg) {
super(ActiveMQExceptionType.SHUTDOWN_ERROR, msg);
}
}

View File

@ -1500,9 +1500,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
XAException xaException = null;
if (onePhase) {
logger.debug("Throwing oneFase RMFAIL on xid=" + xid, t);
//we must return XA_RMFAIL
xaException = new XAException(XAException.XAER_RMFAIL);
} else {
logger.debug("Throwing twoFase Retry on xid=" + xid, t);
// Any error on commit -> RETRY
// We can't rollback a Prepared TX for definition
xaException = new XAException(XAException.XA_RETRY);
@ -1753,7 +1755,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
} catch (XAException xae) {
throw xae;
} catch (ActiveMQException e) {
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || e.getType() == ActiveMQExceptionType.SHUTDOWN_ERROR) {
// Unblocked on failover
throw new XAException(XAException.XA_RETRY);
}

View File

@ -31,7 +31,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncoderPersister;
@ -334,19 +336,19 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
private void checkStatus() {
private void checkStatus() throws Exception {
checkStatus(null);
}
private void checkStatus(IOCompletion callback) {
private void checkStatus(IOCompletion callback) throws Exception {
if (!started) {
if (callback != null) callback.onError(-1, "JDBC Journal is not loaded");
throw new IllegalStateException("JDBCJournal is not loaded");
throw new ActiveMQShutdownException("JDBCJournal is not loaded");
}
if (failed.get()) {
if (callback != null) callback.onError(-1, "JDBC Journal failed");
throw new IllegalStateException("JDBCJournal Failed");
throw new ActiveMQException("JDBCJournal Failed");
}
}
@ -388,7 +390,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
if (callback != null) callback.waitCompletion();
}
private synchronized void addTxRecord(JDBCJournalRecord record) {
private synchronized void addTxRecord(JDBCJournalRecord record) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("addTxRecord " + record + ", started=" + started + ", failed=" + failed);

View File

@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
@ -823,6 +824,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
usedFile);
}
result.set(true);
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendPrepareRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
setErrorCondition(callback, null, e);
@ -882,7 +886,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
result.set(true);
} catch (Exception e) {
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendUpdateRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
setErrorCondition(callback, null, e);
logger.error("appendUpdateRecord:" + e, e);
@ -933,7 +940,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
record.delete(usedFile);
}
result.set(true);
} catch (Exception e) {
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendDeleteRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
logger.error("appendDeleteRecord:" + e, e);
} finally {
@ -993,7 +1003,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
} catch (Exception e) {
} catch (Throwable e) {
logger.error("appendAddRecordTransactional:" + e, e);
setErrorCondition(null, tx, e);
} finally {
@ -1031,9 +1041,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
private void checkJournalIsLoaded() {
private void checkJournalIsLoaded() throws Exception {
if (state != JournalState.LOADED && state != JournalState.SYNCING) {
throw new IllegalStateException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");
throw new ActiveMQShutdownException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");
}
}
@ -1085,7 +1095,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
} catch ( Exception e ) {
} catch (Throwable e ) {
logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e );
setErrorCondition(null, tx, e );
} finally {
@ -1132,7 +1142,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.addNegative(usedFile, id);
} catch (Exception e) {
} catch (Throwable e) {
logger.error("appendDeleteRecordTransactional:" + e, e);
setErrorCondition(null, tx, e);
} finally {
@ -1185,7 +1195,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.prepare(usedFile);
} catch (Exception e) {
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendPrepareRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
logger.error("appendPrepareRecord:" + e, e);
setErrorCondition(callback, tx, e);
@ -1267,6 +1280,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.commit(usedFile);
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendCommitRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
logger.error("appendCommitRecord:" + e, e);
@ -1317,6 +1333,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
tx.rollback(usedFile);
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendRollbackRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
logger.error("appendRollbackRecord:" + e, e);
@ -2360,10 +2379,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return;
}
setJournalState(JournalState.STOPPED);
flush();
setJournalState(JournalState.STOPPED);
if (providedIOThreadPool == null) {
threadPool.shutdown();
@ -2681,6 +2700,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final JournalTransaction tx,
final IOCallback parameterCallback) throws Exception {
checkJournalIsLoaded();
final IOCallback callback;
final int size = encoder.getEncodeSize();

View File

@ -126,8 +126,10 @@ import org.jboss.logging.Logger;
*/
public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {
private static final int CRITICAL_PATHS = 1;
private static final int CRITICAL_STORE = 0;
protected static final int CRITICAL_PATHS = 3;
protected static final int CRITICAL_STORE = 0;
protected static final int CRITICAL_STOP = 1;
protected static final int CRITICAL_STOP_2 = 2;
private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class);
@ -405,6 +407,16 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
leaveCritical(CRITICAL_STORE);
}
/** for internal use and testsuite, don't use it outside of tests */
public void writeLock() {
storageManagerLock.writeLock().lock();
}
/** for internal use and testsuite, don't use it outside of tests */
public void writeUnlock() {
storageManagerLock.writeLock().unlock();
}
@Override
public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
readLock();

View File

@ -229,9 +229,21 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
@Override
public synchronized void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
try {
enterCritical(CRITICAL_STOP);
synchronized (this) {
if (internalStop(ioCriticalError, sendFailover))
return;
}
} finally {
leaveCritical(CRITICAL_STOP);
}
}
private boolean internalStop(boolean ioCriticalError, boolean sendFailover) throws Exception {
if (!started) {
return;
return true;
}
if (!ioCriticalError) {
@ -255,30 +267,41 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
// that's ok
}
// We cache the variable as the replicator could be changed between here and the time we call stop
// since sendLiveIsStopping may issue a close back from the channel
// and we want to ensure a stop here just in case
ReplicationManager replicatorInUse = replicator;
if (replicatorInUse != null) {
if (sendFailover) {
final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
if (token != null) {
try {
token.waitCompletion(5000);
} catch (Exception e) {
// ignore it
enterCritical(CRITICAL_STOP_2);
storageManagerLock.writeLock().lock();
try {
// We cache the variable as the replicator could be changed between here and the time we call stop
// since sendLiveIsStopping may issue a close back from the channel
// and we want to ensure a stop here just in case
ReplicationManager replicatorInUse = replicator;
if (replicatorInUse != null) {
if (sendFailover) {
final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
if (token != null) {
try {
token.waitCompletion(5000);
} catch (Exception e) {
// ignore it
}
}
}
// we cannot clear replication tokens, otherwise clients will eventually be informed of completion during a server's shutdown
// while the backup will never receive then
replicatorInUse.stop(false);
}
replicatorInUse.stop();
bindingsJournal.stop();
messageJournal.stop();
journalLoaded = false;
started = false;
} finally {
storageManagerLock.writeLock().unlock();
leaveCritical(CRITICAL_STOP_2);
}
bindingsJournal.stop();
messageJournal.stop();
journalLoaded = false;
started = false;
return false;
}
/**

View File

@ -150,6 +150,20 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
journals[id] = journal;
}
/**
* This is for tests basically, do not use it as its API is not guaranteed for future usage.
*/
public void pause() {
started = false;
}
/**
* This is for tests basically, do not use it as its API is not guaranteed for future usage.
*/
public void resume() {
started = true;
}
@Override
public void handlePacket(final Packet packet) {
if (logger.isTraceEnabled()) {

View File

@ -282,6 +282,10 @@ public final class ReplicationManager implements ActiveMQComponent {
@Override
public void stop() throws Exception {
stop(true);
}
public void stop(boolean clearTokens) throws Exception {
synchronized (this) {
if (!started) {
logger.trace("Stopping being ignored as it hasn't been started");
@ -297,7 +301,10 @@ public final class ReplicationManager implements ActiveMQComponent {
enabled = false;
writable.set(true);
clearReplicationTokens();
if (clearTokens) {
clearReplicationTokens();
}
RemotingConnection toStop = remotingConnection;
if (toStop != null) {

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.junit.Assert;
import org.junit.Test;
public class NettyReplicationStopTest extends FailoverTestBase {
@Override
protected TestableServer createTestableServer(Configuration config) {
return new SameProcessActiveMQServer(createServer(true, config));
}
@Override
protected void createConfigs() throws Exception {
createReplicatedConfigs();
}
@Override
protected NodeManager createNodeManager() throws Exception {
return new InVMNodeManager(false);
}
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return getNettyAcceptorTransportConfiguration(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
return getNettyConnectorTransportConfiguration(live);
}
@Override
protected final void crash(boolean waitFailure, ClientSession... sessions) throws Exception {
if (sessions.length > 0) {
for (ClientSession session : sessions) {
waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer());
}
} else {
waitForRemoteBackup(null, 5, true, backupServer.getServer());
}
super.crash(waitFailure, sessions);
}
@Override
protected final void crash(ClientSession... sessions) throws Exception {
crash(true, sessions);
}
@Test
public void testReplicaStop() throws Exception {
Map<String, Object> params = new HashMap<>();
params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
TransportConfiguration tc = createTransportConfiguration(true, false, params);
ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(15);
final ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = sf.createSession(true, true);
session.createQueue(ADDRESS, ADDRESS, null, true);
ClientProducer producer = session.createProducer(ADDRESS);
final int numMessages = 10;
ReplicationEndpoint endpoint = backupServer.getServer().getReplicationEndpoint();
endpoint.pause();
ArrayList<Thread> threads = new ArrayList<>();
final ArrayList<Integer> codesSent = new ArrayList<>();
CountDownLatch alignedOnSend = new CountDownLatch(10);
for (int i = 0; i < numMessages; i++) {
final int code = i;
Thread t = new Thread("WillSend " + code) {
@Override
public void run() {
try {
ClientSession session = sf.createSession(true, true);
ClientProducer producer = session.createProducer(ADDRESS);
ClientMessage message = session.createMessage(true).putIntProperty("i", code);
alignedOnSend.countDown();
System.out.println("blocking!!");
producer.send(message);
codesSent.add(code);
System.out.println("Sent!");
} catch (Exception e) {
// that's ok;
e.printStackTrace(); // logging just for debug & reference
}
}
};
t.start();
threads.add(t);
}
Assert.assertTrue(alignedOnSend.await(10, TimeUnit.SECONDS));
liveServer.stop();
Assert.assertEquals(0, codesSent.size());
}
}

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -364,6 +365,105 @@ public class BasicXaTest extends ActiveMQTestBase {
}
@Test
public void testPrepareError() throws Exception {
Xid xid = newXID();
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
ClientProducer clientProducer = clientSession2.createProducer(atestq);
ClientMessage m1 = createTextMessage(clientSession2, "m1");
ClientMessage m2 = createTextMessage(clientSession2, "m2");
ClientMessage m3 = createTextMessage(clientSession2, "m3");
ClientMessage m4 = createTextMessage(clientSession2, "m4");
clientProducer.send(m1);
clientProducer.send(m2);
clientProducer.send(m3);
clientProducer.send(m4);
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
ClientMessage m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
clientSession.end(xid, XAResource.TMSUCCESS);
StorageManager journalStorageManager = messagingService.getStorageManager();
clientSession.prepare(xid);
journalStorageManager.getMessageJournal().stop();
try {
clientSession.commit(xid, false);
Assert.fail("Exception exptected");
} catch (XAException e) {
Assert.assertTrue(e.errorCode == XAException.XA_RETRY);
}
}
@Test
public void testRollbackError() throws Exception {
Xid xid = newXID();
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
ClientProducer clientProducer = clientSession2.createProducer(atestq);
ClientMessage m1 = createTextMessage(clientSession2, "m1");
ClientMessage m2 = createTextMessage(clientSession2, "m2");
ClientMessage m3 = createTextMessage(clientSession2, "m3");
ClientMessage m4 = createTextMessage(clientSession2, "m4");
clientProducer.send(m1);
clientProducer.send(m2);
clientProducer.send(m3);
clientProducer.send(m4);
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
ClientMessage m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
clientSession.end(xid, XAResource.TMSUCCESS);
StorageManager journalStorageManager = messagingService.getStorageManager();
clientSession.prepare(xid);
journalStorageManager.getMessageJournal().stop();
try {
clientSession.rollback(xid);
Assert.fail("Exception exptected");
} catch (XAException e) {
Assert.assertTrue(e.errorCode == XAException.XA_RETRY);
}
}
@Test
public void testReceiveRollback() throws Exception {
int numSessions = 100;