ARTEMIS-1269 Fixing blocked replication
If replication blocked anything on the journal the processing from clients would be blocked and nothing would work. As part of this fix I am using an executor on ServerSessionPacketHandler which will also scale better as the reader from Netty would be feed immediately.
This commit is contained in:
parent
f3cc555ab0
commit
89e84e1320
|
@ -18,7 +18,9 @@ package org.apache.activemq.artemis.utils;
|
||||||
|
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||||
|
@ -33,6 +35,21 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
|
||||||
|
|
||||||
private final Executor parent;
|
private final Executor parent;
|
||||||
|
|
||||||
|
|
||||||
|
public static boolean flushExecutor(Executor executor) {
|
||||||
|
return flushExecutor(executor, 30, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit) {
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
executor.execute(latch::countDown);
|
||||||
|
try {
|
||||||
|
return latch.await(timeout, unit);
|
||||||
|
} catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a new instance delegating to the given parent executor.
|
* Construct a new instance delegating to the given parent executor.
|
||||||
*
|
*
|
||||||
|
|
|
@ -96,7 +96,7 @@ public final class FileWrapperJournal extends JournalBase {
|
||||||
IOCompletion callback) throws Exception {
|
IOCompletion callback) throws Exception {
|
||||||
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
|
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
|
||||||
|
|
||||||
writeRecord(addRecord, sync, callback);
|
writeRecord(addRecord, false, -1, false, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -107,7 +107,9 @@ public final class FileWrapperJournal extends JournalBase {
|
||||||
* Write the record to the current file.
|
* Write the record to the current file.
|
||||||
*/
|
*/
|
||||||
private void writeRecord(JournalInternalRecord encoder,
|
private void writeRecord(JournalInternalRecord encoder,
|
||||||
final boolean sync,
|
final boolean tx,
|
||||||
|
final long txID,
|
||||||
|
final boolean removeTX,
|
||||||
final IOCompletion callback) throws Exception {
|
final IOCompletion callback) throws Exception {
|
||||||
|
|
||||||
lockAppend.lock();
|
lockAppend.lock();
|
||||||
|
@ -115,30 +117,54 @@ public final class FileWrapperJournal extends JournalBase {
|
||||||
if (callback != null) {
|
if (callback != null) {
|
||||||
callback.storeLineUp();
|
callback.storeLineUp();
|
||||||
}
|
}
|
||||||
currentFile = journal.switchFileIfNecessary(encoder.getEncodeSize());
|
testSwitchFiles(encoder);
|
||||||
|
if (txID >= 0) {
|
||||||
|
if (tx) {
|
||||||
|
AtomicInteger value;
|
||||||
|
if (removeTX) {
|
||||||
|
value = transactions.remove(txID);
|
||||||
|
} else {
|
||||||
|
value = transactions.get(txID);
|
||||||
|
}
|
||||||
|
if (value != null) {
|
||||||
|
encoder.setNumberOfRecords(value.get());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
count(txID);
|
||||||
|
}
|
||||||
|
}
|
||||||
encoder.setFileID(currentFile.getRecordID());
|
encoder.setFileID(currentFile.getRecordID());
|
||||||
|
|
||||||
if (callback != null) {
|
if (callback != null) {
|
||||||
currentFile.getFile().write(encoder, sync, callback);
|
currentFile.getFile().write(encoder, false, callback);
|
||||||
} else {
|
} else {
|
||||||
currentFile.getFile().write(encoder, sync);
|
currentFile.getFile().write(encoder, false);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lockAppend.unlock();
|
lockAppend.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testSwitchFiles(JournalInternalRecord encoder) throws Exception {
|
||||||
|
JournalFile oldFile = currentFile;
|
||||||
|
currentFile = journal.switchFileIfNecessary(encoder.getEncodeSize());
|
||||||
|
if (oldFile != currentFile) {
|
||||||
|
for (AtomicInteger value : transactions.values()) {
|
||||||
|
value.set(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception {
|
public void appendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception {
|
||||||
JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
|
JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
|
||||||
writeRecord(deleteRecord, sync, callback);
|
writeRecord(deleteRecord, false, -1, false, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
|
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
|
||||||
count(txID);
|
|
||||||
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
|
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
|
||||||
writeRecord(deleteRecordTX, false, null);
|
writeRecord(deleteRecordTX, false, txID, false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -147,9 +173,8 @@ public final class FileWrapperJournal extends JournalBase {
|
||||||
byte recordType,
|
byte recordType,
|
||||||
Persister persister,
|
Persister persister,
|
||||||
Object record) throws Exception {
|
Object record) throws Exception {
|
||||||
count(txID);
|
|
||||||
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
|
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
|
||||||
writeRecord(addRecord, false, null);
|
writeRecord(addRecord, false, txID, false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -160,7 +185,7 @@ public final class FileWrapperJournal extends JournalBase {
|
||||||
boolean sync,
|
boolean sync,
|
||||||
IOCompletion callback) throws Exception {
|
IOCompletion callback) throws Exception {
|
||||||
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
|
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
|
||||||
writeRecord(updateRecord, sync, callback);
|
writeRecord(updateRecord, false, -1, false, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -169,9 +194,8 @@ public final class FileWrapperJournal extends JournalBase {
|
||||||
byte recordType,
|
byte recordType,
|
||||||
Persister persister,
|
Persister persister,
|
||||||
Object record) throws Exception {
|
Object record) throws Exception {
|
||||||
count(txID);
|
|
||||||
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, persister, record);
|
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, persister, record);
|
||||||
writeRecord(updateRecordTX, false, null);
|
writeRecord(updateRecordTX, false, txID, false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -180,12 +204,8 @@ public final class FileWrapperJournal extends JournalBase {
|
||||||
IOCompletion callback,
|
IOCompletion callback,
|
||||||
boolean lineUpContext) throws Exception {
|
boolean lineUpContext) throws Exception {
|
||||||
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
|
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
|
||||||
AtomicInteger value = transactions.remove(txID);
|
|
||||||
if (value != null) {
|
|
||||||
commitRecord.setNumberOfRecords(value.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
writeRecord(commitRecord, true, callback);
|
writeRecord(commitRecord, true, txID, true, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -194,20 +214,18 @@ public final class FileWrapperJournal extends JournalBase {
|
||||||
boolean sync,
|
boolean sync,
|
||||||
IOCompletion callback) throws Exception {
|
IOCompletion callback) throws Exception {
|
||||||
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
|
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
|
||||||
AtomicInteger value = transactions.get(txID);
|
writeRecord(prepareRecord, true, txID, false, callback);
|
||||||
if (value != null) {
|
|
||||||
prepareRecord.setNumberOfRecords(value.get());
|
|
||||||
}
|
|
||||||
writeRecord(prepareRecord, sync, callback);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private int count(long txID) throws ActiveMQException {
|
private int count(long txID) throws ActiveMQException {
|
||||||
AtomicInteger defaultValue = new AtomicInteger(1);
|
AtomicInteger defaultValue = new AtomicInteger(1);
|
||||||
AtomicInteger count = transactions.putIfAbsent(txID, defaultValue);
|
AtomicInteger count = transactions.putIfAbsent(txID, defaultValue);
|
||||||
if (count != null) {
|
if (count != null) {
|
||||||
return count.incrementAndGet();
|
count.incrementAndGet();
|
||||||
|
} else {
|
||||||
|
count = defaultValue;
|
||||||
}
|
}
|
||||||
return defaultValue.get();
|
return count.intValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -218,11 +236,7 @@ public final class FileWrapperJournal extends JournalBase {
|
||||||
@Override
|
@Override
|
||||||
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
|
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
|
||||||
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
|
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
|
||||||
AtomicInteger value = transactions.remove(txID);
|
writeRecord(rollbackRecord, true, txID, true, callback);
|
||||||
if (value != null) {
|
|
||||||
rollbackRecord.setNumberOfRecords(value.get());
|
|
||||||
}
|
|
||||||
writeRecord(rollbackRecord, sync, callback);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// UNSUPPORTED STUFF
|
// UNSUPPORTED STUFF
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core;
|
||||||
import javax.transaction.xa.XAResource;
|
import javax.transaction.xa.XAResource;
|
||||||
import javax.transaction.xa.Xid;
|
import javax.transaction.xa.Xid;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
@ -90,6 +91,9 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
|
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
||||||
|
import org.apache.activemq.artemis.utils.SimpleFuture;
|
||||||
|
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
|
||||||
|
@ -141,6 +145,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
private volatile CoreRemotingConnection remotingConnection;
|
private volatile CoreRemotingConnection remotingConnection;
|
||||||
|
|
||||||
|
private final Executor callExecutor;
|
||||||
|
|
||||||
private final CoreProtocolManager manager;
|
private final CoreProtocolManager manager;
|
||||||
|
|
||||||
// The current currentLargeMessage being processed
|
// The current currentLargeMessage being processed
|
||||||
|
@ -148,7 +154,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
private final boolean direct;
|
private final boolean direct;
|
||||||
|
|
||||||
public ServerSessionPacketHandler(final CoreProtocolManager manager,
|
public ServerSessionPacketHandler(final Executor callExecutor,
|
||||||
|
final CoreProtocolManager manager,
|
||||||
final ServerSession session,
|
final ServerSession session,
|
||||||
final StorageManager storageManager,
|
final StorageManager storageManager,
|
||||||
final Channel channel) {
|
final Channel channel) {
|
||||||
|
@ -166,6 +173,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
Connection conn = remotingConnection.getTransportConnection();
|
Connection conn = remotingConnection.getTransportConnection();
|
||||||
|
|
||||||
|
this.callExecutor = callExecutor;
|
||||||
|
|
||||||
if (conn instanceof NettyConnection) {
|
if (conn instanceof NettyConnection) {
|
||||||
direct = ((NettyConnection) conn).isDirectDeliver();
|
direct = ((NettyConnection) conn).isDirectDeliver();
|
||||||
} else {
|
} else {
|
||||||
|
@ -199,11 +208,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ActiveMQServerLogger.LOGGER.errorClosingSession(e);
|
ActiveMQServerLogger.LOGGER.errorClosingSession(e);
|
||||||
}
|
}
|
||||||
|
flushExecutor();
|
||||||
|
|
||||||
ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
|
ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void flushExecutor() {
|
||||||
|
OrderedExecutorFactory.flushExecutor(callExecutor);
|
||||||
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
|
flushExecutor();
|
||||||
|
|
||||||
channel.flushConfirmations();
|
channel.flushConfirmations();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -219,6 +235,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handlePacket(final Packet packet) {
|
public void handlePacket(final Packet packet) {
|
||||||
|
channel.confirm(packet);
|
||||||
|
callExecutor.execute(() -> internalHandlePacket(packet));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void internalHandlePacket(final Packet packet) {
|
||||||
byte type = packet.getType();
|
byte type = packet.getType();
|
||||||
|
|
||||||
storageManager.setContext(session.getSessionContext());
|
storageManager.setContext(session.getSessionContext());
|
||||||
|
@ -653,8 +674,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
final boolean flush,
|
final boolean flush,
|
||||||
final boolean closeChannel) {
|
final boolean closeChannel) {
|
||||||
if (confirmPacket != null) {
|
if (confirmPacket != null) {
|
||||||
channel.confirm(confirmPacket);
|
|
||||||
|
|
||||||
if (flush) {
|
if (flush) {
|
||||||
channel.flushConfirmations();
|
channel.flushConfirmations();
|
||||||
}
|
}
|
||||||
|
@ -678,9 +697,26 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
remotingConnection.removeFailureListener((FailureListener) closeListener);
|
remotingConnection.removeFailureListener((FailureListener) closeListener);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
flushExecutor();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) {
|
public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) {
|
||||||
|
|
||||||
|
SimpleFuture<Integer> future = new SimpleFutureImpl<>();
|
||||||
|
callExecutor.execute(() -> {
|
||||||
|
int value = internaltransferConnection(newConnection, lastReceivedCommandID);
|
||||||
|
future.set(value);
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
return future.get().intValue();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IllegalStateException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int internaltransferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) {
|
||||||
// We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
|
// We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
|
||||||
// delivered
|
// delivered
|
||||||
// after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
|
// after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
|
||||||
|
|
|
@ -169,7 +169,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap);
|
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap);
|
||||||
|
|
||||||
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(protocolManager, session, server.getStorageManager(), channel);
|
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server.getExecutorFactory().getExecutor(), protocolManager, session, server.getStorageManager(), channel);
|
||||||
channel.setHandler(handler);
|
channel.setHandler(handler);
|
||||||
|
|
||||||
// TODO - where is this removed?
|
// TODO - where is this removed?
|
||||||
|
|
|
@ -27,9 +27,7 @@ import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
@ -81,6 +79,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
|
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
|
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
|
||||||
|
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -204,9 +203,11 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
ActiveMQServerLogger.LOGGER.invalidPacketForReplication(packet);
|
ActiveMQServerLogger.LOGGER.invalidPacketForReplication(packet);
|
||||||
}
|
}
|
||||||
} catch (ActiveMQException e) {
|
} catch (ActiveMQException e) {
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
|
ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
|
||||||
response = new ActiveMQExceptionMessage(e);
|
response = new ActiveMQExceptionMessage(e);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
|
ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
|
||||||
response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e));
|
response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e));
|
||||||
}
|
}
|
||||||
|
@ -278,6 +279,12 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.trace("Stopping endpoint");
|
||||||
|
|
||||||
|
started = false;
|
||||||
|
|
||||||
|
OrderedExecutorFactory.flushExecutor(executor);
|
||||||
|
|
||||||
// Channel may be null if there isn't a connection to a live server
|
// Channel may be null if there isn't a connection to a live server
|
||||||
if (channel != null) {
|
if (channel != null) {
|
||||||
channel.close();
|
channel.close();
|
||||||
|
@ -315,15 +322,6 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
pageManager.stop();
|
pageManager.stop();
|
||||||
|
|
||||||
pageIndex.clear();
|
pageIndex.clear();
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
|
||||||
executor.execute(new Runnable() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
latch.await(30, TimeUnit.SECONDS);
|
|
||||||
|
|
||||||
// Storage needs to be the last to stop
|
// Storage needs to be the last to stop
|
||||||
storageManager.stop();
|
storageManager.stop();
|
||||||
|
@ -471,28 +469,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
|
logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
|
||||||
}
|
}
|
||||||
|
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
|
||||||
|
if (!started)
|
||||||
|
return replicationResponseMessage;
|
||||||
|
|
||||||
if (packet.isSynchronizationFinished()) {
|
if (packet.isSynchronizationFinished()) {
|
||||||
executor.execute(() -> {
|
|
||||||
try {
|
|
||||||
// this is a long running process, we cannot block the reading thread from netty
|
|
||||||
finishSynchronization(packet.getNodeID());
|
finishSynchronization(packet.getNodeID());
|
||||||
if (logger.isTraceEnabled()) {
|
replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
|
||||||
logger.trace("returning completion on synchronization catchup");
|
|
||||||
}
|
|
||||||
channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true));
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.warn(e.getMessage());
|
|
||||||
channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)));
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
// the write will happen through an executor
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
|
|
||||||
if (!started) {
|
|
||||||
return replicationResponseMessage;
|
return replicationResponseMessage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -326,6 +326,9 @@ public class ClusterController implements ActiveMQComponent {
|
||||||
@Override
|
@Override
|
||||||
public void handlePacket(Packet packet) {
|
public void handlePacket(Packet packet) {
|
||||||
if (!isStarted()) {
|
if (!isStarted()) {
|
||||||
|
if (channelHandler != null) {
|
||||||
|
channelHandler.handlePacket(packet);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1750,7 +1750,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
queue.deleteQueue(removeConsumers);
|
queue.deleteQueue(removeConsumers);
|
||||||
|
|
||||||
if (autoDeleteAddress && postOffice != null && getAddressInfo(address).isAutoCreated()) {
|
AddressInfo addressInfo = getAddressInfo(address);
|
||||||
|
|
||||||
|
if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated()) {
|
||||||
try {
|
try {
|
||||||
removeAddressInfo(address, session);
|
removeAddressInfo(address, session);
|
||||||
} catch (ActiveMQDeleteAddressException e) {
|
} catch (ActiveMQDeleteAddressException e) {
|
||||||
|
|
|
@ -16,8 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.cluster.reattach;
|
package org.apache.activemq.artemis.tests.integration.cluster.reattach;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A MultiThreadRandomReattachTest
|
* A MultiThreadRandomReattachTest
|
||||||
|
@ -28,6 +30,7 @@ public class MultiThreadRandomReattachTest extends MultiThreadRandomReattachTest
|
||||||
protected void start() throws Exception {
|
protected void start() throws Exception {
|
||||||
Configuration liveConf = createDefaultInVMConfig();
|
Configuration liveConf = createDefaultInVMConfig();
|
||||||
server = createServer(false, liveConf);
|
server = createServer(false, liveConf);
|
||||||
|
server.getConfiguration().getAddressConfigurations().add(new CoreAddressConfiguration().setName(ADDRESS.toString()).addRoutingType(RoutingType.MULTICAST));
|
||||||
server.start();
|
server.start();
|
||||||
waitForServerToStart(server);
|
waitForServerToStart(server);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,8 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.cluster.reattach;
|
package org.apache.activemq.artemis.tests.integration.cluster.reattach;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
|
||||||
|
|
||||||
public class NettyMultiThreadRandomReattachTest extends MultiThreadRandomReattachTest {
|
public class NettyMultiThreadRandomReattachTest extends MultiThreadRandomReattachTest {
|
||||||
|
|
||||||
|
@ -25,6 +27,7 @@ public class NettyMultiThreadRandomReattachTest extends MultiThreadRandomReattac
|
||||||
protected void start() throws Exception {
|
protected void start() throws Exception {
|
||||||
Configuration liveConf = createDefaultNettyConfig();
|
Configuration liveConf = createDefaultNettyConfig();
|
||||||
server = createServer(false, liveConf);
|
server = createServer(false, liveConf);
|
||||||
|
server.getConfiguration().getAddressConfigurations().add(new CoreAddressConfiguration().setName(ADDRESS.toString()).addRoutingType(RoutingType.MULTICAST));
|
||||||
server.start();
|
server.start();
|
||||||
waitForServerToStart(server);
|
waitForServerToStart(server);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue