This closes #2255
This commit is contained in:
commit
281cff3d41
|
@ -249,6 +249,12 @@ public enum ActiveMQExceptionType {
|
||||||
public ActiveMQException createException(String msg) {
|
public ActiveMQException createException(String msg) {
|
||||||
return new ActiveMQNullRefException(msg);
|
return new ActiveMQNullRefException(msg);
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
SHUTDOWN_ERROR(219) {
|
||||||
|
@Override
|
||||||
|
public ActiveMQException createException(String msg) {
|
||||||
|
return new ActiveMQShutdownException(msg);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
|
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.api.core;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An operation failed because an address exists on the server.
|
||||||
|
*/
|
||||||
|
public final class ActiveMQShutdownException extends ActiveMQException {
|
||||||
|
|
||||||
|
public ActiveMQShutdownException() {
|
||||||
|
super(ActiveMQExceptionType.SHUTDOWN_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ActiveMQShutdownException(String msg) {
|
||||||
|
super(ActiveMQExceptionType.SHUTDOWN_ERROR, msg);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1500,9 +1500,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
||||||
|
|
||||||
XAException xaException = null;
|
XAException xaException = null;
|
||||||
if (onePhase) {
|
if (onePhase) {
|
||||||
|
logger.debug("Throwing oneFase RMFAIL on xid=" + xid, t);
|
||||||
//we must return XA_RMFAIL
|
//we must return XA_RMFAIL
|
||||||
xaException = new XAException(XAException.XAER_RMFAIL);
|
xaException = new XAException(XAException.XAER_RMFAIL);
|
||||||
} else {
|
} else {
|
||||||
|
logger.debug("Throwing twoFase Retry on xid=" + xid, t);
|
||||||
// Any error on commit -> RETRY
|
// Any error on commit -> RETRY
|
||||||
// We can't rollback a Prepared TX for definition
|
// We can't rollback a Prepared TX for definition
|
||||||
xaException = new XAException(XAException.XA_RETRY);
|
xaException = new XAException(XAException.XA_RETRY);
|
||||||
|
@ -1753,7 +1755,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
||||||
} catch (XAException xae) {
|
} catch (XAException xae) {
|
||||||
throw xae;
|
throw xae;
|
||||||
} catch (ActiveMQException e) {
|
} catch (ActiveMQException e) {
|
||||||
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
|
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || e.getType() == ActiveMQExceptionType.SHUTDOWN_ERROR) {
|
||||||
// Unblocked on failover
|
// Unblocked on failover
|
||||||
throw new XAException(XAException.XA_RETRY);
|
throw new XAException(XAException.XA_RETRY);
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,9 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
|
||||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.journal.EncoderPersister;
|
import org.apache.activemq.artemis.core.journal.EncoderPersister;
|
||||||
|
@ -334,19 +336,19 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void checkStatus() {
|
private void checkStatus() throws Exception {
|
||||||
checkStatus(null);
|
checkStatus(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkStatus(IOCompletion callback) {
|
private void checkStatus(IOCompletion callback) throws Exception {
|
||||||
if (!started) {
|
if (!started) {
|
||||||
if (callback != null) callback.onError(-1, "JDBC Journal is not loaded");
|
if (callback != null) callback.onError(-1, "JDBC Journal is not loaded");
|
||||||
throw new IllegalStateException("JDBCJournal is not loaded");
|
throw new ActiveMQShutdownException("JDBCJournal is not loaded");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (failed.get()) {
|
if (failed.get()) {
|
||||||
if (callback != null) callback.onError(-1, "JDBC Journal failed");
|
if (callback != null) callback.onError(-1, "JDBC Journal failed");
|
||||||
throw new IllegalStateException("JDBCJournal Failed");
|
throw new ActiveMQException("JDBCJournal Failed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -388,7 +390,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
if (callback != null) callback.waitCompletion();
|
if (callback != null) callback.waitCompletion();
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void addTxRecord(JDBCJournalRecord record) {
|
private synchronized void addTxRecord(JDBCJournalRecord record) throws Exception {
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("addTxRecord " + record + ", started=" + started + ", failed=" + failed);
|
logger.trace("addTxRecord " + record + ", started=" + started + ", failed=" + failed);
|
||||||
|
|
|
@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
|
@ -823,6 +824,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
usedFile);
|
usedFile);
|
||||||
}
|
}
|
||||||
result.set(true);
|
result.set(true);
|
||||||
|
} catch (ActiveMQShutdownException e) {
|
||||||
|
result.fail(e);
|
||||||
|
logger.error("appendPrepareRecord:" + e, e);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
result.fail(e);
|
result.fail(e);
|
||||||
setErrorCondition(callback, null, e);
|
setErrorCondition(callback, null, e);
|
||||||
|
@ -882,7 +886,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
}
|
}
|
||||||
|
|
||||||
result.set(true);
|
result.set(true);
|
||||||
} catch (Exception e) {
|
} catch (ActiveMQShutdownException e) {
|
||||||
|
result.fail(e);
|
||||||
|
logger.error("appendUpdateRecord:" + e, e);
|
||||||
|
} catch (Throwable e) {
|
||||||
result.fail(e);
|
result.fail(e);
|
||||||
setErrorCondition(callback, null, e);
|
setErrorCondition(callback, null, e);
|
||||||
logger.error("appendUpdateRecord:" + e, e);
|
logger.error("appendUpdateRecord:" + e, e);
|
||||||
|
@ -933,7 +940,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
record.delete(usedFile);
|
record.delete(usedFile);
|
||||||
}
|
}
|
||||||
result.set(true);
|
result.set(true);
|
||||||
} catch (Exception e) {
|
} catch (ActiveMQShutdownException e) {
|
||||||
|
result.fail(e);
|
||||||
|
logger.error("appendDeleteRecord:" + e, e);
|
||||||
|
} catch (Throwable e) {
|
||||||
result.fail(e);
|
result.fail(e);
|
||||||
logger.error("appendDeleteRecord:" + e, e);
|
logger.error("appendDeleteRecord:" + e, e);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -993,7 +1003,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
}
|
}
|
||||||
|
|
||||||
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
|
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
|
||||||
} catch (Exception e) {
|
} catch (Throwable e) {
|
||||||
logger.error("appendAddRecordTransactional:" + e, e);
|
logger.error("appendAddRecordTransactional:" + e, e);
|
||||||
setErrorCondition(null, tx, e);
|
setErrorCondition(null, tx, e);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1031,9 +1041,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkJournalIsLoaded() {
|
private void checkJournalIsLoaded() throws Exception {
|
||||||
if (state != JournalState.LOADED && state != JournalState.SYNCING) {
|
if (state != JournalState.LOADED && state != JournalState.SYNCING) {
|
||||||
throw new IllegalStateException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");
|
throw new ActiveMQShutdownException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1085,7 +1095,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
}
|
}
|
||||||
|
|
||||||
tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
|
tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
|
||||||
} catch ( Exception e ) {
|
} catch (Throwable e ) {
|
||||||
logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e );
|
logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e );
|
||||||
setErrorCondition(null, tx, e );
|
setErrorCondition(null, tx, e );
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1132,7 +1142,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
}
|
}
|
||||||
|
|
||||||
tx.addNegative(usedFile, id);
|
tx.addNegative(usedFile, id);
|
||||||
} catch (Exception e) {
|
} catch (Throwable e) {
|
||||||
logger.error("appendDeleteRecordTransactional:" + e, e);
|
logger.error("appendDeleteRecordTransactional:" + e, e);
|
||||||
setErrorCondition(null, tx, e);
|
setErrorCondition(null, tx, e);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1185,7 +1195,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
}
|
}
|
||||||
|
|
||||||
tx.prepare(usedFile);
|
tx.prepare(usedFile);
|
||||||
} catch (Exception e) {
|
} catch (ActiveMQShutdownException e) {
|
||||||
|
result.fail(e);
|
||||||
|
logger.error("appendPrepareRecord:" + e, e);
|
||||||
|
} catch (Throwable e) {
|
||||||
result.fail(e);
|
result.fail(e);
|
||||||
logger.error("appendPrepareRecord:" + e, e);
|
logger.error("appendPrepareRecord:" + e, e);
|
||||||
setErrorCondition(callback, tx, e);
|
setErrorCondition(callback, tx, e);
|
||||||
|
@ -1267,6 +1280,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
|
|
||||||
|
|
||||||
tx.commit(usedFile);
|
tx.commit(usedFile);
|
||||||
|
} catch (ActiveMQShutdownException e) {
|
||||||
|
result.fail(e);
|
||||||
|
logger.error("appendCommitRecord:" + e, e);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
result.fail(e);
|
result.fail(e);
|
||||||
logger.error("appendCommitRecord:" + e, e);
|
logger.error("appendCommitRecord:" + e, e);
|
||||||
|
@ -1317,6 +1333,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
|
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
|
||||||
|
|
||||||
tx.rollback(usedFile);
|
tx.rollback(usedFile);
|
||||||
|
} catch (ActiveMQShutdownException e) {
|
||||||
|
result.fail(e);
|
||||||
|
logger.error("appendRollbackRecord:" + e, e);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
result.fail(e);
|
result.fail(e);
|
||||||
logger.error("appendRollbackRecord:" + e, e);
|
logger.error("appendRollbackRecord:" + e, e);
|
||||||
|
@ -2360,10 +2379,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
setJournalState(JournalState.STOPPED);
|
|
||||||
|
|
||||||
flush();
|
flush();
|
||||||
|
|
||||||
|
setJournalState(JournalState.STOPPED);
|
||||||
|
|
||||||
if (providedIOThreadPool == null) {
|
if (providedIOThreadPool == null) {
|
||||||
threadPool.shutdown();
|
threadPool.shutdown();
|
||||||
|
|
||||||
|
@ -2681,6 +2700,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
final JournalTransaction tx,
|
final JournalTransaction tx,
|
||||||
final IOCallback parameterCallback) throws Exception {
|
final IOCallback parameterCallback) throws Exception {
|
||||||
|
|
||||||
|
checkJournalIsLoaded();
|
||||||
|
|
||||||
final IOCallback callback;
|
final IOCallback callback;
|
||||||
|
|
||||||
final int size = encoder.getEncodeSize();
|
final int size = encoder.getEncodeSize();
|
||||||
|
|
|
@ -126,8 +126,10 @@ import org.jboss.logging.Logger;
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {
|
public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {
|
||||||
|
|
||||||
private static final int CRITICAL_PATHS = 1;
|
protected static final int CRITICAL_PATHS = 3;
|
||||||
private static final int CRITICAL_STORE = 0;
|
protected static final int CRITICAL_STORE = 0;
|
||||||
|
protected static final int CRITICAL_STOP = 1;
|
||||||
|
protected static final int CRITICAL_STOP_2 = 2;
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class);
|
private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class);
|
||||||
|
|
||||||
|
@ -405,6 +407,16 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
leaveCritical(CRITICAL_STORE);
|
leaveCritical(CRITICAL_STORE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** for internal use and testsuite, don't use it outside of tests */
|
||||||
|
public void writeLock() {
|
||||||
|
storageManagerLock.writeLock().lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** for internal use and testsuite, don't use it outside of tests */
|
||||||
|
public void writeUnlock() {
|
||||||
|
storageManagerLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
|
public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
|
||||||
readLock();
|
readLock();
|
||||||
|
|
|
@ -229,9 +229,21 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
|
public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
|
||||||
|
try {
|
||||||
|
enterCritical(CRITICAL_STOP);
|
||||||
|
synchronized (this) {
|
||||||
|
if (internalStop(ioCriticalError, sendFailover))
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
leaveCritical(CRITICAL_STOP);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean internalStop(boolean ioCriticalError, boolean sendFailover) throws Exception {
|
||||||
if (!started) {
|
if (!started) {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ioCriticalError) {
|
if (!ioCriticalError) {
|
||||||
|
@ -255,30 +267,41 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
// that's ok
|
// that's ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// We cache the variable as the replicator could be changed between here and the time we call stop
|
enterCritical(CRITICAL_STOP_2);
|
||||||
// since sendLiveIsStopping may issue a close back from the channel
|
storageManagerLock.writeLock().lock();
|
||||||
// and we want to ensure a stop here just in case
|
try {
|
||||||
ReplicationManager replicatorInUse = replicator;
|
|
||||||
if (replicatorInUse != null) {
|
// We cache the variable as the replicator could be changed between here and the time we call stop
|
||||||
if (sendFailover) {
|
// since sendLiveIsStopping may issue a close back from the channel
|
||||||
final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
|
// and we want to ensure a stop here just in case
|
||||||
if (token != null) {
|
ReplicationManager replicatorInUse = replicator;
|
||||||
try {
|
if (replicatorInUse != null) {
|
||||||
token.waitCompletion(5000);
|
if (sendFailover) {
|
||||||
} catch (Exception e) {
|
final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
|
||||||
// ignore it
|
if (token != null) {
|
||||||
|
try {
|
||||||
|
token.waitCompletion(5000);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// ignore it
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// we cannot clear replication tokens, otherwise clients will eventually be informed of completion during a server's shutdown
|
||||||
|
// while the backup will never receive then
|
||||||
|
replicatorInUse.stop(false);
|
||||||
}
|
}
|
||||||
replicatorInUse.stop();
|
bindingsJournal.stop();
|
||||||
|
|
||||||
|
messageJournal.stop();
|
||||||
|
|
||||||
|
journalLoaded = false;
|
||||||
|
|
||||||
|
started = false;
|
||||||
|
} finally {
|
||||||
|
storageManagerLock.writeLock().unlock();
|
||||||
|
leaveCritical(CRITICAL_STOP_2);
|
||||||
}
|
}
|
||||||
bindingsJournal.stop();
|
return false;
|
||||||
|
|
||||||
messageJournal.stop();
|
|
||||||
|
|
||||||
journalLoaded = false;
|
|
||||||
|
|
||||||
started = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -150,6 +150,20 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
journals[id] = journal;
|
journals[id] = journal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is for tests basically, do not use it as its API is not guaranteed for future usage.
|
||||||
|
*/
|
||||||
|
public void pause() {
|
||||||
|
started = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is for tests basically, do not use it as its API is not guaranteed for future usage.
|
||||||
|
*/
|
||||||
|
public void resume() {
|
||||||
|
started = true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handlePacket(final Packet packet) {
|
public void handlePacket(final Packet packet) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
|
|
@ -282,6 +282,10 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
|
stop(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop(boolean clearTokens) throws Exception {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (!started) {
|
if (!started) {
|
||||||
logger.trace("Stopping being ignored as it hasn't been started");
|
logger.trace("Stopping being ignored as it hasn't been started");
|
||||||
|
@ -297,7 +301,10 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
|
|
||||||
enabled = false;
|
enabled = false;
|
||||||
writable.set(true);
|
writable.set(true);
|
||||||
clearReplicationTokens();
|
|
||||||
|
if (clearTokens) {
|
||||||
|
clearReplicationTokens();
|
||||||
|
}
|
||||||
|
|
||||||
RemotingConnection toStop = remotingConnection;
|
RemotingConnection toStop = remotingConnection;
|
||||||
if (toStop != null) {
|
if (toStop != null) {
|
||||||
|
|
|
@ -0,0 +1,150 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.cluster.failover;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||||
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
|
import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
|
||||||
|
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class NettyReplicationStopTest extends FailoverTestBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TestableServer createTestableServer(Configuration config) {
|
||||||
|
return new SameProcessActiveMQServer(createServer(true, config));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void createConfigs() throws Exception {
|
||||||
|
createReplicatedConfigs();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected NodeManager createNodeManager() throws Exception {
|
||||||
|
return new InVMNodeManager(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
|
||||||
|
return getNettyAcceptorTransportConfiguration(live);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
|
||||||
|
return getNettyConnectorTransportConfiguration(live);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected final void crash(boolean waitFailure, ClientSession... sessions) throws Exception {
|
||||||
|
if (sessions.length > 0) {
|
||||||
|
for (ClientSession session : sessions) {
|
||||||
|
waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
waitForRemoteBackup(null, 5, true, backupServer.getServer());
|
||||||
|
}
|
||||||
|
super.crash(waitFailure, sessions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected final void crash(ClientSession... sessions) throws Exception {
|
||||||
|
crash(true, sessions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicaStop() throws Exception {
|
||||||
|
|
||||||
|
Map<String, Object> params = new HashMap<>();
|
||||||
|
params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
|
||||||
|
TransportConfiguration tc = createTransportConfiguration(true, false, params);
|
||||||
|
|
||||||
|
ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(15);
|
||||||
|
|
||||||
|
final ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
|
||||||
|
|
||||||
|
ClientSession session = sf.createSession(true, true);
|
||||||
|
|
||||||
|
session.createQueue(ADDRESS, ADDRESS, null, true);
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(ADDRESS);
|
||||||
|
|
||||||
|
final int numMessages = 10;
|
||||||
|
|
||||||
|
ReplicationEndpoint endpoint = backupServer.getServer().getReplicationEndpoint();
|
||||||
|
|
||||||
|
endpoint.pause();
|
||||||
|
|
||||||
|
ArrayList<Thread> threads = new ArrayList<>();
|
||||||
|
final ArrayList<Integer> codesSent = new ArrayList<>();
|
||||||
|
|
||||||
|
CountDownLatch alignedOnSend = new CountDownLatch(10);
|
||||||
|
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
final int code = i;
|
||||||
|
Thread t = new Thread("WillSend " + code) {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
ClientSession session = sf.createSession(true, true);
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(ADDRESS);
|
||||||
|
|
||||||
|
ClientMessage message = session.createMessage(true).putIntProperty("i", code);
|
||||||
|
alignedOnSend.countDown();
|
||||||
|
System.out.println("blocking!!");
|
||||||
|
producer.send(message);
|
||||||
|
codesSent.add(code);
|
||||||
|
|
||||||
|
System.out.println("Sent!");
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
// that's ok;
|
||||||
|
e.printStackTrace(); // logging just for debug & reference
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
t.start();
|
||||||
|
|
||||||
|
threads.add(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue(alignedOnSend.await(10, TimeUnit.SECONDS));
|
||||||
|
liveServer.stop();
|
||||||
|
|
||||||
|
Assert.assertEquals(0, codesSent.size());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
@ -364,6 +365,105 @@ public class BasicXaTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPrepareError() throws Exception {
|
||||||
|
Xid xid = newXID();
|
||||||
|
|
||||||
|
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
|
||||||
|
ClientProducer clientProducer = clientSession2.createProducer(atestq);
|
||||||
|
ClientMessage m1 = createTextMessage(clientSession2, "m1");
|
||||||
|
ClientMessage m2 = createTextMessage(clientSession2, "m2");
|
||||||
|
ClientMessage m3 = createTextMessage(clientSession2, "m3");
|
||||||
|
ClientMessage m4 = createTextMessage(clientSession2, "m4");
|
||||||
|
clientProducer.send(m1);
|
||||||
|
clientProducer.send(m2);
|
||||||
|
clientProducer.send(m3);
|
||||||
|
clientProducer.send(m4);
|
||||||
|
|
||||||
|
clientSession.start(xid, XAResource.TMNOFLAGS);
|
||||||
|
clientSession.start();
|
||||||
|
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
|
||||||
|
ClientMessage m = clientConsumer.receive(1000);
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
m.acknowledge();
|
||||||
|
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
|
||||||
|
m = clientConsumer.receive(1000);
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
m.acknowledge();
|
||||||
|
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
|
||||||
|
m = clientConsumer.receive(1000);
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
m.acknowledge();
|
||||||
|
Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
|
||||||
|
m = clientConsumer.receive(1000);
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
m.acknowledge();
|
||||||
|
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
|
||||||
|
clientSession.end(xid, XAResource.TMSUCCESS);
|
||||||
|
|
||||||
|
StorageManager journalStorageManager = messagingService.getStorageManager();
|
||||||
|
|
||||||
|
clientSession.prepare(xid);
|
||||||
|
|
||||||
|
journalStorageManager.getMessageJournal().stop();
|
||||||
|
try {
|
||||||
|
clientSession.commit(xid, false);
|
||||||
|
Assert.fail("Exception exptected");
|
||||||
|
} catch (XAException e) {
|
||||||
|
Assert.assertTrue(e.errorCode == XAException.XA_RETRY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRollbackError() throws Exception {
|
||||||
|
Xid xid = newXID();
|
||||||
|
|
||||||
|
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
|
||||||
|
ClientProducer clientProducer = clientSession2.createProducer(atestq);
|
||||||
|
ClientMessage m1 = createTextMessage(clientSession2, "m1");
|
||||||
|
ClientMessage m2 = createTextMessage(clientSession2, "m2");
|
||||||
|
ClientMessage m3 = createTextMessage(clientSession2, "m3");
|
||||||
|
ClientMessage m4 = createTextMessage(clientSession2, "m4");
|
||||||
|
clientProducer.send(m1);
|
||||||
|
clientProducer.send(m2);
|
||||||
|
clientProducer.send(m3);
|
||||||
|
clientProducer.send(m4);
|
||||||
|
|
||||||
|
clientSession.start(xid, XAResource.TMNOFLAGS);
|
||||||
|
clientSession.start();
|
||||||
|
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
|
||||||
|
ClientMessage m = clientConsumer.receive(1000);
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
m.acknowledge();
|
||||||
|
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
|
||||||
|
m = clientConsumer.receive(1000);
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
m.acknowledge();
|
||||||
|
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
|
||||||
|
m = clientConsumer.receive(1000);
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
m.acknowledge();
|
||||||
|
Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
|
||||||
|
m = clientConsumer.receive(1000);
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
m.acknowledge();
|
||||||
|
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
|
||||||
|
clientSession.end(xid, XAResource.TMSUCCESS);
|
||||||
|
|
||||||
|
StorageManager journalStorageManager = messagingService.getStorageManager();
|
||||||
|
|
||||||
|
clientSession.prepare(xid);
|
||||||
|
|
||||||
|
journalStorageManager.getMessageJournal().stop();
|
||||||
|
try {
|
||||||
|
clientSession.rollback(xid);
|
||||||
|
Assert.fail("Exception exptected");
|
||||||
|
} catch (XAException e) {
|
||||||
|
Assert.assertTrue(e.errorCode == XAException.XA_RETRY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReceiveRollback() throws Exception {
|
public void testReceiveRollback() throws Exception {
|
||||||
int numSessions = 100;
|
int numSessions = 100;
|
||||||
|
|
Loading…
Reference in New Issue