ARTEMIS-1975 Removing ThreadLocal for StorageManager

This commit is contained in:
Clebert Suconic 2020-03-25 13:54:34 -04:00
parent 31e71ba905
commit ca50b3449e
29 changed files with 148 additions and 63 deletions

View File

@ -91,7 +91,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
private volatile AmqpReadableBuffer parsingData; private volatile AmqpReadableBuffer parsingData;
private final StorageManager storageManager; private StorageManager storageManager;
public AMQPLargeMessage(long id, public AMQPLargeMessage(long id,
long messageFormat, long messageFormat,
@ -183,6 +183,12 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
return largeBody.getStorageManager(); return largeBody.getStorageManager();
} }
@Override
public void setStorageManager(StorageManager storageManager) {
largeBody.setStorageManager(storageManager);
this.storageManager = storageManager;
}
@Override @Override
public final boolean isDurable() { public final boolean isDurable() {
if (fileDurable != null) { if (fileDurable != null) {

View File

@ -22,7 +22,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
@ -116,7 +115,7 @@ public class AMQPLargeMessagePersister extends MessagePersister {
properties = null; properties = null;
} }
AMQPLargeMessage largeMessage = new AMQPLargeMessage(id, format, properties, null, AbstractJournalStorageManager.getThreadLocal()); AMQPLargeMessage largeMessage = new AMQPLargeMessage(id, format, properties, null, null);
largeMessage.setFileDurable(durable); largeMessage.setFileDurable(durable);
if (address != null) { if (address != null) {

View File

@ -100,7 +100,6 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
@Override @Override
public void onDelivery(Consumer<? super MessageReference> onDelivery) { public void onDelivery(Consumer<? super MessageReference> onDelivery) {
assert this.onDelivery == null;
this.onDelivery = onDelivery; this.onDelivery = onDelivery;
} }

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
@ -381,6 +382,7 @@ public final class Page implements Comparable<Page> {
fileBuffer.position(endPosition + 1); fileBuffer.position(endPosition + 1);
assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte"; assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte";
msg.initMessage(storage); msg.initMessage(storage);
assert msg.getMessage() instanceof LargeServerMessage && ((LargeServerMessage)msg.getMessage()).getStorageManager() != null || !(msg.getMessage() instanceof LargeServerMessage);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName); logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName);
} }

View File

@ -89,10 +89,17 @@ public class PagedMessageImpl implements PagedMessage {
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(largeMessageLazyData); ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(largeMessageLazyData);
lgMessage = LargeMessagePersister.getInstance().decode(buffer, lgMessage, null); lgMessage = LargeMessagePersister.getInstance().decode(buffer, lgMessage, null);
if (lgMessage.toMessage() instanceof LargeServerMessage) {
((LargeServerMessage)lgMessage.toMessage()).setStorageManager(storage);
}
lgMessage.toMessage().usageUp(); lgMessage.toMessage().usageUp();
lgMessage.setPaged(); lgMessage.setPaged();
this.message = lgMessage.toMessage(); this.message = lgMessage.toMessage();
largeMessageLazyData = null; largeMessageLazyData = null;
} else {
if (message != null && message instanceof LargeServerMessage) {
((LargeServerMessage)message).setStorageManager(storageManager);
}
} }
} }
@ -123,10 +130,11 @@ public class PagedMessageImpl implements PagedMessage {
} else { } else {
this.message = storageManager.createLargeMessage().toMessage(); this.message = storageManager.createLargeMessage().toMessage();
LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message, null); LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message, null);
((LargeServerMessage) message).setStorageManager(storageManager);
((LargeServerMessage) message).toMessage().usageUp(); ((LargeServerMessage) message).toMessage().usageUp();
} }
} else { } else {
this.message = MessagePersister.getInstance().decode(buffer, null, null); this.message = MessagePersister.getInstance().decode(buffer, null, null, storageManager);
} }
int queueIDsSize = buffer.readInt(); int queueIDsSize = buffer.readInt();

View File

@ -135,18 +135,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
protected static final int CRITICAL_STOP_2 = 2; protected static final int CRITICAL_STOP_2 = 2;
public static ThreadLocal<StorageManager> storageManagerThreadLocal = new ThreadLocal<>();
/** Persisters may need to access this on reloading of the journal,
* for large message processing */
public static void setupThreadLocal(StorageManager manager) {
storageManagerThreadLocal.set(manager);
}
public static StorageManager getThreadLocal() {
return storageManagerThreadLocal.get();
}
private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class); private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class);
public enum JournalContent { public enum JournalContent {
@ -857,7 +845,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
Map<Long, Message> messages = new HashMap<>(); Map<Long, Message> messages = new HashMap<>();
readLock(); readLock();
setupThreadLocal(this);
try { try {
JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this)); JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this));
@ -935,15 +922,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
Message message = MessagePersister.getInstance().decode(buff, null, pools); Message message = decodeMessage(pools, buff);
/* if (message instanceof LargeServerMessage) {
try {
((LargeServerMessage) message).finishParse();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
} */
messages.put(record.id, message); messages.put(record.id, message);
@ -1240,11 +1219,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return info; return info;
} finally { } finally {
readUnLock(); readUnLock();
// need to clear it, otherwise we may have a permanent leak
setupThreadLocal(null);
} }
} }
private Message decodeMessage(CoreMessageObjectPools pools, ActiveMQBuffer buff) {
Message message = MessagePersister.getInstance().decode(buff, null, pools, this);
return message;
}
public void checkInvalidPageTransactions(PagingManager pagingManager, public void checkInvalidPageTransactions(PagingManager pagingManager,
Set<PageTransactionInfo> invalidPageTransactions) { Set<PageTransactionInfo> invalidPageTransactions) {
if (invalidPageTransactions != null && !invalidPageTransactions.isEmpty()) { if (invalidPageTransactions != null && !invalidPageTransactions.isEmpty()) {
@ -1795,7 +1777,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
if (pools == null) { if (pools == null) {
pools = new CoreMessageObjectPools(); pools = new CoreMessageObjectPools();
} }
Message message = MessagePersister.getInstance().decode(buff, null, pools); Message message = decodeMessage(pools, buff);
messages.put(record.id, message); messages.put(record.id, message);

View File

@ -569,7 +569,7 @@ public final class DescribeJournal {
return "ADD-MESSAGE is not supported any longer, use export/import"; return "ADD-MESSAGE is not supported any longer, use export/import";
} }
case ADD_MESSAGE_PROTOCOL: { case ADD_MESSAGE_PROTOCOL: {
Message message = MessagePersister.getInstance().decode(buffer, null, null); Message message = MessagePersister.getInstance().decode(buffer, null, null, storageManager);
return new MessageDescribe(message); return new MessageDescribe(message);
} }
case ADD_REF: { case ADD_REF: {

View File

@ -359,6 +359,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
LargeMessagePersister.getInstance().decode(buff, largeMessage, null); LargeMessagePersister.getInstance().decode(buff, largeMessage, null);
largeMessage.setStorageManager(this);
if (largeMessage.toMessage().containsProperty(Message.HDR_ORIG_MESSAGE_ID)) { if (largeMessage.toMessage().containsProperty(Message.HDR_ORIG_MESSAGE_ID)) {
// for compatibility: couple with old behaviour, copying the old file to avoid message loss // for compatibility: couple with old behaviour, copying the old file to avoid message loss
long originalMessageID = largeMessage.toMessage().getLongProperty(Message.HDR_ORIG_MESSAGE_ID); long originalMessageID = largeMessage.toMessage().getLongProperty(Message.HDR_ORIG_MESSAGE_ID);

View File

@ -44,7 +44,7 @@ public class LargeBody {
private long pendingRecordID = NO_PENDING_ID; private long pendingRecordID = NO_PENDING_ID;
final StorageManager storageManager; StorageManager storageManager;
private long messageID = -1; private long messageID = -1;
@ -69,6 +69,10 @@ public class LargeBody {
return storageManager; return storageManager;
} }
public void setStorageManager(StorageManager storageManager) {
this.storageManager = storageManager;
}
public ByteBuffer map() throws Exception { public ByteBuffer map() throws Exception {
ensureFileExists(true); ensureFileExists(true);
if (!file.isOpen()) { if (!file.isOpen()) {

View File

@ -252,6 +252,11 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
} }
} }
@Override
public void setStorageManager(StorageManager storageManager) {
this.largeBody.setStorageManager(storageManager);
}
@Override @Override
public Message copy() { public Message copy() {
SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable); SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable);

View File

@ -70,6 +70,11 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ
} }
@Override
public void setStorageManager(StorageManager storageManager) {
}
@Override @Override
public synchronized void addBytes(ActiveMQBuffer bytes) { public synchronized void addBytes(ActiveMQBuffer bytes) {
final int readableBytes = bytes.readableBytes(); final int readableBytes = bytes.readableBytes();

View File

@ -17,7 +17,9 @@
package org.apache.activemq.artemis.core.protocol; package org.apache.activemq.artemis.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
@ -28,6 +30,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupResp
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationDownstreamConnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationDownstreamConnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnounceMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
@ -55,6 +58,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSen
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@ -88,6 +92,13 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
private static final long serialVersionUID = 3348673114388400766L; private static final long serialVersionUID = 3348673114388400766L;
private final StorageManager storageManager;
public ServerPacketDecoder(StorageManager storageManager) {
assert storageManager != null;
this.storageManager = storageManager;
}
private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) { private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final SessionSendMessage sendMessage; final SessionSendMessage sendMessage;
@ -265,6 +276,14 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
packet.decode(in); packet.decode(in);
if (packet instanceof MessagePacketI) {
Message message = ((MessagePacketI)packet).getMessage();
if (message instanceof LargeServerMessage) {
assert storageManager != null;
((LargeServerMessage) message).setStorageManager(storageManager);
}
}
return packet; return packet;
} }
} }

View File

@ -1031,6 +1031,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
} }
LargeServerMessage message = currentLargeMessage; LargeServerMessage message = currentLargeMessage;
currentLargeMessage.setStorageManager(storageManager);
currentLargeMessage = null; currentLargeMessage = null;
session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), storageManager), null, false, false); session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), storageManager), null, false, false);
} }

View File

@ -127,7 +127,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
Executor connectionExecutor = server.getExecutorFactory().getExecutor(); Executor connectionExecutor = server.getExecutorFactory().getExecutor();
final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(), final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(server.getStorageManager()),
connection, incomingInterceptors, outgoingInterceptors, server.getNodeID(), connection, incomingInterceptors, outgoingInterceptors, server.getNodeID(),
connectionExecutor); connectionExecutor);

View File

@ -17,12 +17,13 @@
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl; import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationPageWriteMessage extends PacketImpl { public class ReplicationPageWriteMessage extends PacketImpl implements MessagePacketI {
private int pageNumber; private int pageNumber;
@ -83,6 +84,17 @@ public class ReplicationPageWriteMessage extends PacketImpl {
return result; return result;
} }
@Override
public Message getMessage() {
return pagedMessage.getMessage();
}
@Override
public ReplicationPageWriteMessage replaceMessage(Message message) {
// nothing to be done
return this;
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) if (this == obj)

View File

@ -64,5 +64,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
LargeBody getLargeBody(); LargeBody getLargeBody();
void setStorageManager(StorageManager storageManager);
void finishParse() throws Exception; void finishParse() throws Exception;
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder; import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder; import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
@ -31,6 +32,12 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
ServerLocator locator; ServerLocator locator;
final StorageManager storageManager;
private ActiveMQServerSideProtocolManagerFactory(StorageManager storageManager) {
this.storageManager = storageManager;
}
@Override @Override
public ServerLocator getLocator() { public ServerLocator getLocator() {
return locator; return locator;
@ -41,15 +48,12 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
this.locator = locator; this.locator = locator;
} }
public static ActiveMQServerSideProtocolManagerFactory getInstance(ServerLocator locator) { public static ActiveMQServerSideProtocolManagerFactory getInstance(ServerLocator locator, StorageManager storageManager) {
ActiveMQServerSideProtocolManagerFactory instance = new ActiveMQServerSideProtocolManagerFactory(); ActiveMQServerSideProtocolManagerFactory instance = new ActiveMQServerSideProtocolManagerFactory(storageManager);
instance.setLocator(locator); instance.setLocator(locator);
return instance; return instance;
} }
private ActiveMQServerSideProtocolManagerFactory() {
}
private static final long serialVersionUID = 1; private static final long serialVersionUID = 1;
@Override @Override
@ -66,7 +70,7 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
@Override @Override
protected PacketDecoder createPacketDecoder() { protected PacketDecoder createPacketDecoder() {
return new ServerPacketDecoder(); return new ServerPacketDecoder(storageManager);
} }
} }
} }

View File

@ -227,7 +227,7 @@ public class BackupManager implements ActiveMQComponent {
backupServerLocator.setIdentity("backupLocatorFor='" + server + "'"); backupServerLocator.setIdentity("backupLocatorFor='" + server + "'");
backupServerLocator.setReconnectAttempts(-1); backupServerLocator.setReconnectAttempts(-1);
backupServerLocator.setInitialConnectAttempts(-1); backupServerLocator.setInitialConnectAttempts(-1);
backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(backupServerLocator)); backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(backupServerLocator, server.getStorageManager()));
} }
} }
@ -359,7 +359,7 @@ public class BackupManager implements ActiveMQComponent {
ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs); ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true); locator.setClusterConnection(true);
locator.setRetryInterval(retryInterval); locator.setRetryInterval(retryInterval);
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, server.getStorageManager()));
return locator; return locator;
} }
return null; return null;

View File

@ -191,7 +191,7 @@ public class ClusterController implements ActiveMQComponent {
serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier()); serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
serverLocator.setMaxRetryInterval(config.getMaxRetryInterval()); serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
//this is used for replication so need to use the server packet decoder //this is used for replication so need to use the server packet decoder
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager()));
serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool()); serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool());
try { try {
serverLocator.initialize(); serverLocator.initialize();
@ -254,7 +254,7 @@ public class ClusterController implements ActiveMQComponent {
* @return the Cluster Control * @return the Cluster Control
*/ */
public ClusterControl connectToNodeInCluster(ClientSessionFactoryInternal sf) { public ClusterControl connectToNodeInCluster(ClientSessionFactoryInternal sf) {
sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(sf.getServerLocator())); sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(sf.getServerLocator(), server.getStorageManager()));
return new ClusterControl(sf, server); return new ClusterControl(sf, server);
} }

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -79,6 +80,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
private final long targetNodeEventUID; private final long targetNodeEventUID;
private final StorageManager storageManager;
private final ServerLocatorInternal discoveryLocator; private final ServerLocatorInternal discoveryLocator;
private final String storeAndForwardPrefix; private final String storeAndForwardPrefix;
@ -111,7 +114,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
final SimpleString managementNotificationAddress, final SimpleString managementNotificationAddress,
final MessageFlowRecord flowRecord, final MessageFlowRecord flowRecord,
final TransportConfiguration connector, final TransportConfiguration connector,
final String storeAndForwardPrefix) { final String storeAndForwardPrefix,
final StorageManager storageManager) {
super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same
retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server, ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType())); retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server, ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType()));
@ -134,11 +138,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
} }
this.storeAndForwardPrefix = storeAndForwardPrefix; this.storeAndForwardPrefix = storeAndForwardPrefix;
this.storageManager = storageManager;
} }
@Override @Override
protected ClientSessionFactoryInternal createSessionFactory() throws Exception { protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, storageManager));
ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) serverLocator.createSessionFactory(targetNodeID); ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) serverLocator.createSessionFactory(targetNodeID);
//if it is null then its possible the broker was removed after a disconnect so lets try the original connectors //if it is null then its possible the broker was removed after a disconnect so lets try the original connectors
if (factory == null) { if (factory == null) {

View File

@ -628,7 +628,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
serverLocator.setAfterConnectionInternalListener(this); serverLocator.setAfterConnectionInternalListener(this);
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager()));
serverLocator.start(server.getExecutorFactory().getExecutor()); serverLocator.start(server.getExecutorFactory().getExecutor());
} }
@ -816,7 +816,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
targetLocator.setAfterConnectionInternalListener(this); targetLocator.setAfterConnectionInternalListener(this);
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager()));
targetLocator.setNodeID(nodeId); targetLocator.setNodeID(nodeId);
@ -830,7 +830,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
targetLocator.addIncomingInterceptor(new IncomingInterceptorLookingForExceptionMessage(manager, executorFactory.getExecutor())); targetLocator.addIncomingInterceptor(new IncomingInterceptorLookingForExceptionMessage(manager, executorFactory.getExecutor()));
MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue); MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue);
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server, managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector(), storeAndForwardPrefix); ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server, managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector(), storeAndForwardPrefix, server.getStorageManager());
targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")"); targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");

View File

@ -49,6 +49,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
private ActiveMQServer parentServer; private ActiveMQServer parentServer;
private ServerLocator locator; private ServerLocator locator;
private final ClusterController clusterController; private final ClusterController clusterController;
private final StorageManager storageManager;
public BackupRecoveryJournalLoader(PostOffice postOffice, public BackupRecoveryJournalLoader(PostOffice postOffice,
PagingManager pagingManager, PagingManager pagingManager,
@ -66,6 +67,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
this.parentServer = parentServer; this.parentServer = parentServer;
this.locator = locator; this.locator = locator;
this.clusterController = clusterController; this.clusterController = clusterController;
this.storageManager = storageManager;
} }
@Override @Override
@ -88,7 +90,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
ResourceManager resourceManager, ResourceManager resourceManager,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception { Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager()); ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, storageManager));
try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) { try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) {
scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID()); scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID());

View File

@ -139,7 +139,7 @@ public class LiveOnlyActivation extends Activation {
try { try {
scaleDownServerLocator = ScaleDownPolicy.getScaleDownConnector(scaleDownPolicy, activeMQServer); scaleDownServerLocator = ScaleDownPolicy.getScaleDownConnector(scaleDownPolicy, activeMQServer);
//use a Node Locator to connect to the cluster //use a Node Locator to connect to the cluster
scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(scaleDownServerLocator)); scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(scaleDownServerLocator, activeMQServer.getStorageManager()));
LiveNodeLocator nodeLocator = scaleDownPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForScaleDown(activeMQServer) : new NamedLiveNodeLocatorForScaleDown(scaleDownPolicy.getGroupName(), activeMQServer); LiveNodeLocator nodeLocator = scaleDownPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForScaleDown(activeMQServer) : new NamedLiveNodeLocatorForScaleDown(scaleDownPolicy.getGroupName(), activeMQServer);
scaleDownServerLocator.addClusterTopologyListener(nodeLocator); scaleDownServerLocator.addClusterTopologyListener(nodeLocator);

View File

@ -25,7 +25,6 @@ import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -95,18 +94,16 @@ public class EmbedMessageUtil {
} }
private static Message readEncoded(ICoreMessage message, StorageManager storageManager, ActiveMQBuffer buffer) { private static Message readEncoded(ICoreMessage message, StorageManager storageManager, ActiveMQBuffer buffer) {
AbstractJournalStorageManager.setupThreadLocal(storageManager);
try { try {
Message returnMessage = MessagePersister.getInstance().decode(buffer, null, null); Message returnMessage = MessagePersister.getInstance().decode(buffer, null, null, storageManager);
if (returnMessage instanceof LargeServerMessage) {
((LargeServerMessage)returnMessage).setStorageManager(storageManager);
}
returnMessage.setMessageID(message.getMessageID()); returnMessage.setMessageID(message.getMessageID());
return returnMessage; return returnMessage;
} catch (Exception e) { } catch (Exception e) {
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);
return message; return message;
} finally {
AbstractJournalStorageManager.setupThreadLocal(null);
} }
} }

View File

@ -24,6 +24,8 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.persistence.PersisterIDs.MAX_PERSISTERS; import static org.apache.activemq.artemis.core.persistence.PersisterIDs.MAX_PERSISTERS;
@ -105,11 +107,20 @@ public class MessagePersister implements Persister<Message> {
@Override @Override
public Message decode(ActiveMQBuffer buffer, Message record, CoreMessageObjectPools pools) { public Message decode(ActiveMQBuffer buffer, Message record, CoreMessageObjectPools pools) {
return decode(buffer, record, pools, null);
}
public Message decode(ActiveMQBuffer buffer, Message record, CoreMessageObjectPools pools, StorageManager storageManager) {
byte protocol = buffer.readByte(); byte protocol = buffer.readByte();
Persister<Message> persister = getPersister(protocol); Persister<Message> persister = getPersister(protocol);
if (persister == null) { if (persister == null) {
throw new NullPointerException("couldn't find factory for type=" + protocol); throw new NullPointerException("couldn't find factory for type=" + protocol);
} }
return persister.decode(buffer, record, pools); Message message = persister.decode(buffer, record, pools);
if (message instanceof LargeServerMessage) {
((LargeServerMessage) message).setStorageManager(storageManager);
}
return message;
} }
} }

View File

@ -126,7 +126,7 @@ public class ClusterControllerTest extends ClusterTestBase {
@Test @Test
public void controlWithDifferentConnector() throws Exception { public void controlWithDifferentConnector() throws Exception {
try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) { try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, servers[0].getStorageManager()));
ClusterController controller = new ClusterController(getServer(0), getServer(0).getScheduledPool()); ClusterController controller = new ClusterController(getServer(0), getServer(0).getScheduledPool());
ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory()); ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory());
clusterControl.authorize(); clusterControl.authorize();
@ -136,7 +136,7 @@ public class ClusterControllerTest extends ClusterTestBase {
@Test @Test
public void controlWithDifferentPassword() throws Exception { public void controlWithDifferentPassword() throws Exception {
try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) { try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, servers[0].getStorageManager()));
ClusterController controller = new ClusterController(getServer(1), getServer(1).getScheduledPool()); ClusterController controller = new ClusterController(getServer(1), getServer(1).getScheduledPool());
ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory()); ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory());
try { try {

View File

@ -1396,7 +1396,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
setSessionFactoryCreateLocator(node, ha, serverTotc); setSessionFactoryCreateLocator(node, ha, serverTotc);
locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node])); locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node], servers[node].getStorageManager()));
locators[node].setBlockOnNonDurableSend(true).setBlockOnDurableSend(true); locators[node].setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
addServerLocator(locators[node]); addServerLocator(locators[node]);

View File

@ -147,6 +147,10 @@
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions> <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
<instance>${basedir}/target/replicated-static0</instance> <instance>${basedir}/target/replicated-static0</instance>
<configuration>${basedir}/target/classes/servers/replicated-static0</configuration> <configuration>${basedir}/target/classes/servers/replicated-static0</configuration>
<args>
<arg>--java-options</arg>
<arg>-ea</arg>
</args>
</configuration> </configuration>
</execution> </execution>
<execution> <execution>
@ -160,6 +164,10 @@
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions> <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
<instance>${basedir}/target/replicated-static1</instance> <instance>${basedir}/target/replicated-static1</instance>
<configuration>${basedir}/target/classes/servers/replicated-static1</configuration> <configuration>${basedir}/target/classes/servers/replicated-static1</configuration>
<args>
<arg>--java-options</arg>
<arg>-ea</arg>
</args>
</configuration> </configuration>
</execution> </execution>
<execution> <execution>

View File

@ -115,6 +115,11 @@ public class SoakPagingTest extends SmokeTestBase {
public void produce(ConnectionFactory factory) { public void produce(ConnectionFactory factory) {
try { try {
StringBuffer bufferlarge = new StringBuffer();
while (bufferlarge.length() < 110000) {
bufferlarge.append("asdflkajdhsf akljsdfh akljsdfh alksjdfh alkdjsf ");
}
Connection connection = factory.createConnection("admin", "admin"); Connection connection = factory.createConnection("admin", "admin");
connection.start(); connection.start();
@ -125,7 +130,13 @@ public class SoakPagingTest extends SmokeTestBase {
int i = 0; int i = 0;
while (true) { while (true) {
Message message = session.createTextMessage("fkjdslkfjdskljf;lkdsjf;kdsajf;lkjdf;kdsajf;kjdsa;flkjdsa;lfkjdsa;flkj;dsakjf;dsajf;askjd;fkj;dsajflaskfja;fdlkajs;lfdkja;kfj;dsakfj;akdsjf;dsakjf;akfj;lakdsjf;lkasjdf;ksajf;kjdsa;fkj;adskjf;akdsjf;kja;sdkfj;akdsjf;akjdsf;adskjf;akdsjf;askfj;aksjfkdjafndmnfmdsnfjadshfjdsalkfjads;fkjdsa;kfja;skfj;akjfd;akjfd;ksaj;fkja;kfj;dsakjf;dsakjf;dksjf;akdsjf;kdsajf");
Message message;
if (i % 100 == 0) {
message = session.createTextMessage(bufferlarge.toString());
} else {
message = session.createTextMessage("fkjdslkfjdskljf;lkdsjf;kdsajf;lkjdf;kdsajf;kjdsa;flkjdsa;lfkjdsa;flkj;dsakjf;dsajf;askjd;fkj;dsajflaskfja;fdlkajs;lfdkja;kfj;dsakfj;akdsjf;dsakjf;akfj;lakdsjf;lkasjdf;ksajf;kjdsa;fkj;adskjf;akdsjf;kja;sdkfj;akdsjf;akjdsf;adskjf;akdsjf;askfj;aksjfkdjafndmnfmdsnfjadshfjdsalkfjads;fkjdsa;kfja;skfj;akjfd;akjfd;ksaj;fkja;kfj;dsakjf;dsakjf;dksjf;akdsjf;kdsajf");
}
messageProducer.send(message); messageProducer.send(message);
produced.incrementAndGet(); produced.incrementAndGet();