This commit is contained in:
Clebert Suconic 2020-03-25 21:54:46 -04:00
commit 3d5bd38d07
29 changed files with 148 additions and 63 deletions

View File

@ -91,7 +91,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
private volatile AmqpReadableBuffer parsingData;
private final StorageManager storageManager;
private StorageManager storageManager;
public AMQPLargeMessage(long id,
long messageFormat,
@ -183,6 +183,12 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
return largeBody.getStorageManager();
}
@Override
public void setStorageManager(StorageManager storageManager) {
largeBody.setStorageManager(storageManager);
this.storageManager = storageManager;
}
@Override
public final boolean isDurable() {
if (fileDurable != null) {

View File

@ -22,7 +22,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
@ -116,7 +115,7 @@ public class AMQPLargeMessagePersister extends MessagePersister {
properties = null;
}
AMQPLargeMessage largeMessage = new AMQPLargeMessage(id, format, properties, null, AbstractJournalStorageManager.getThreadLocal());
AMQPLargeMessage largeMessage = new AMQPLargeMessage(id, format, properties, null, null);
largeMessage.setFileDurable(durable);
if (address != null) {

View File

@ -100,7 +100,6 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
@Override
public void onDelivery(Consumer<? super MessageReference> onDelivery) {
assert this.onDelivery == null;
this.onDelivery = onDelivery;
}

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
@ -381,6 +382,7 @@ public final class Page implements Comparable<Page> {
fileBuffer.position(endPosition + 1);
assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte";
msg.initMessage(storage);
assert msg.getMessage() instanceof LargeServerMessage && ((LargeServerMessage)msg.getMessage()).getStorageManager() != null || !(msg.getMessage() instanceof LargeServerMessage);
if (logger.isTraceEnabled()) {
logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName);
}

View File

@ -89,10 +89,17 @@ public class PagedMessageImpl implements PagedMessage {
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(largeMessageLazyData);
lgMessage = LargeMessagePersister.getInstance().decode(buffer, lgMessage, null);
if (lgMessage.toMessage() instanceof LargeServerMessage) {
((LargeServerMessage)lgMessage.toMessage()).setStorageManager(storage);
}
lgMessage.toMessage().usageUp();
lgMessage.setPaged();
this.message = lgMessage.toMessage();
largeMessageLazyData = null;
} else {
if (message != null && message instanceof LargeServerMessage) {
((LargeServerMessage)message).setStorageManager(storageManager);
}
}
}
@ -123,10 +130,11 @@ public class PagedMessageImpl implements PagedMessage {
} else {
this.message = storageManager.createLargeMessage().toMessage();
LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message, null);
((LargeServerMessage) message).setStorageManager(storageManager);
((LargeServerMessage) message).toMessage().usageUp();
}
} else {
this.message = MessagePersister.getInstance().decode(buffer, null, null);
this.message = MessagePersister.getInstance().decode(buffer, null, null, storageManager);
}
int queueIDsSize = buffer.readInt();

View File

@ -135,18 +135,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
protected static final int CRITICAL_STOP_2 = 2;
public static ThreadLocal<StorageManager> storageManagerThreadLocal = new ThreadLocal<>();
/** Persisters may need to access this on reloading of the journal,
* for large message processing */
public static void setupThreadLocal(StorageManager manager) {
storageManagerThreadLocal.set(manager);
}
public static StorageManager getThreadLocal() {
return storageManagerThreadLocal.get();
}
private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class);
public enum JournalContent {
@ -857,7 +845,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
Map<Long, Message> messages = new HashMap<>();
readLock();
setupThreadLocal(this);
try {
JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this));
@ -935,15 +922,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
Message message = MessagePersister.getInstance().decode(buff, null, pools);
/* if (message instanceof LargeServerMessage) {
try {
((LargeServerMessage) message).finishParse();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
} */
Message message = decodeMessage(pools, buff);
messages.put(record.id, message);
@ -1240,11 +1219,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return info;
} finally {
readUnLock();
// need to clear it, otherwise we may have a permanent leak
setupThreadLocal(null);
}
}
private Message decodeMessage(CoreMessageObjectPools pools, ActiveMQBuffer buff) {
Message message = MessagePersister.getInstance().decode(buff, null, pools, this);
return message;
}
public void checkInvalidPageTransactions(PagingManager pagingManager,
Set<PageTransactionInfo> invalidPageTransactions) {
if (invalidPageTransactions != null && !invalidPageTransactions.isEmpty()) {
@ -1795,7 +1777,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
if (pools == null) {
pools = new CoreMessageObjectPools();
}
Message message = MessagePersister.getInstance().decode(buff, null, pools);
Message message = decodeMessage(pools, buff);
messages.put(record.id, message);

View File

@ -569,7 +569,7 @@ public final class DescribeJournal {
return "ADD-MESSAGE is not supported any longer, use export/import";
}
case ADD_MESSAGE_PROTOCOL: {
Message message = MessagePersister.getInstance().decode(buffer, null, null);
Message message = MessagePersister.getInstance().decode(buffer, null, null, storageManager);
return new MessageDescribe(message);
}
case ADD_REF: {

View File

@ -359,6 +359,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
LargeMessagePersister.getInstance().decode(buff, largeMessage, null);
largeMessage.setStorageManager(this);
if (largeMessage.toMessage().containsProperty(Message.HDR_ORIG_MESSAGE_ID)) {
// for compatibility: couple with old behaviour, copying the old file to avoid message loss
long originalMessageID = largeMessage.toMessage().getLongProperty(Message.HDR_ORIG_MESSAGE_ID);

View File

@ -44,7 +44,7 @@ public class LargeBody {
private long pendingRecordID = NO_PENDING_ID;
final StorageManager storageManager;
StorageManager storageManager;
private long messageID = -1;
@ -69,6 +69,10 @@ public class LargeBody {
return storageManager;
}
public void setStorageManager(StorageManager storageManager) {
this.storageManager = storageManager;
}
public ByteBuffer map() throws Exception {
ensureFileExists(true);
if (!file.isOpen()) {

View File

@ -252,6 +252,11 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
}
}
@Override
public void setStorageManager(StorageManager storageManager) {
this.largeBody.setStorageManager(storageManager);
}
@Override
public Message copy() {
SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable);

View File

@ -70,6 +70,11 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ
}
@Override
public void setStorageManager(StorageManager storageManager) {
}
@Override
public synchronized void addBytes(ActiveMQBuffer bytes) {
final int readableBytes = bytes.readableBytes();

View File

@ -17,7 +17,9 @@
package org.apache.activemq.artemis.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
@ -28,6 +30,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupResp
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationDownstreamConnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
@ -55,6 +58,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSen
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@ -88,6 +92,13 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
private static final long serialVersionUID = 3348673114388400766L;
private final StorageManager storageManager;
public ServerPacketDecoder(StorageManager storageManager) {
assert storageManager != null;
this.storageManager = storageManager;
}
private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final SessionSendMessage sendMessage;
@ -265,6 +276,14 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
packet.decode(in);
if (packet instanceof MessagePacketI) {
Message message = ((MessagePacketI)packet).getMessage();
if (message instanceof LargeServerMessage) {
assert storageManager != null;
((LargeServerMessage) message).setStorageManager(storageManager);
}
}
return packet;
}
}

View File

@ -1031,6 +1031,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
LargeServerMessage message = currentLargeMessage;
currentLargeMessage.setStorageManager(storageManager);
currentLargeMessage = null;
session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), storageManager), null, false, false);
}

View File

@ -127,7 +127,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
Executor connectionExecutor = server.getExecutorFactory().getExecutor();
final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(),
final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(server.getStorageManager()),
connection, incomingInterceptors, outgoingInterceptors, server.getNodeID(),
connectionExecutor);

View File

@ -17,12 +17,13 @@
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationPageWriteMessage extends PacketImpl {
public class ReplicationPageWriteMessage extends PacketImpl implements MessagePacketI {
private int pageNumber;
@ -83,6 +84,17 @@ public class ReplicationPageWriteMessage extends PacketImpl {
return result;
}
@Override
public Message getMessage() {
return pagedMessage.getMessage();
}
@Override
public ReplicationPageWriteMessage replaceMessage(Message message) {
// nothing to be done
return this;
}
@Override
public boolean equals(Object obj) {
if (this == obj)

View File

@ -64,5 +64,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
LargeBody getLargeBody();
void setStorageManager(StorageManager storageManager);
void finishParse() throws Exception;
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
@ -31,6 +32,12 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
ServerLocator locator;
final StorageManager storageManager;
private ActiveMQServerSideProtocolManagerFactory(StorageManager storageManager) {
this.storageManager = storageManager;
}
@Override
public ServerLocator getLocator() {
return locator;
@ -41,15 +48,12 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
this.locator = locator;
}
public static ActiveMQServerSideProtocolManagerFactory getInstance(ServerLocator locator) {
ActiveMQServerSideProtocolManagerFactory instance = new ActiveMQServerSideProtocolManagerFactory();
public static ActiveMQServerSideProtocolManagerFactory getInstance(ServerLocator locator, StorageManager storageManager) {
ActiveMQServerSideProtocolManagerFactory instance = new ActiveMQServerSideProtocolManagerFactory(storageManager);
instance.setLocator(locator);
return instance;
}
private ActiveMQServerSideProtocolManagerFactory() {
}
private static final long serialVersionUID = 1;
@Override
@ -66,7 +70,7 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
@Override
protected PacketDecoder createPacketDecoder() {
return new ServerPacketDecoder();
return new ServerPacketDecoder(storageManager);
}
}
}

View File

@ -227,7 +227,7 @@ public class BackupManager implements ActiveMQComponent {
backupServerLocator.setIdentity("backupLocatorFor='" + server + "'");
backupServerLocator.setReconnectAttempts(-1);
backupServerLocator.setInitialConnectAttempts(-1);
backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(backupServerLocator));
backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(backupServerLocator, server.getStorageManager()));
}
}
@ -359,7 +359,7 @@ public class BackupManager implements ActiveMQComponent {
ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true);
locator.setRetryInterval(retryInterval);
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, server.getStorageManager()));
return locator;
}
return null;

View File

@ -191,7 +191,7 @@ public class ClusterController implements ActiveMQComponent {
serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
//this is used for replication so need to use the server packet decoder
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager()));
serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool());
try {
serverLocator.initialize();
@ -254,7 +254,7 @@ public class ClusterController implements ActiveMQComponent {
* @return the Cluster Control
*/
public ClusterControl connectToNodeInCluster(ClientSessionFactoryInternal sf) {
sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(sf.getServerLocator()));
sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(sf.getServerLocator(), server.getStorageManager()));
return new ClusterControl(sf, server);
}

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -79,6 +80,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
private final long targetNodeEventUID;
private final StorageManager storageManager;
private final ServerLocatorInternal discoveryLocator;
private final String storeAndForwardPrefix;
@ -111,7 +114,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
final SimpleString managementNotificationAddress,
final MessageFlowRecord flowRecord,
final TransportConfiguration connector,
final String storeAndForwardPrefix) {
final String storeAndForwardPrefix,
final StorageManager storageManager) {
super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same
retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server, ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType()));
@ -134,11 +138,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
this.storeAndForwardPrefix = storeAndForwardPrefix;
this.storageManager = storageManager;
}
@Override
protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, storageManager));
ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) serverLocator.createSessionFactory(targetNodeID);
//if it is null then its possible the broker was removed after a disconnect so lets try the original connectors
if (factory == null) {

View File

@ -628,7 +628,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
serverLocator.setAfterConnectionInternalListener(this);
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager()));
serverLocator.start(server.getExecutorFactory().getExecutor());
}
@ -816,7 +816,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
targetLocator.setAfterConnectionInternalListener(this);
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager()));
targetLocator.setNodeID(nodeId);
@ -830,7 +830,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
targetLocator.addIncomingInterceptor(new IncomingInterceptorLookingForExceptionMessage(manager, executorFactory.getExecutor()));
MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue);
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server, managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector(), storeAndForwardPrefix);
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server, managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector(), storeAndForwardPrefix, server.getStorageManager());
targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");

View File

@ -49,6 +49,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
private ActiveMQServer parentServer;
private ServerLocator locator;
private final ClusterController clusterController;
private final StorageManager storageManager;
public BackupRecoveryJournalLoader(PostOffice postOffice,
PagingManager pagingManager,
@ -66,6 +67,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
this.parentServer = parentServer;
this.locator = locator;
this.clusterController = clusterController;
this.storageManager = storageManager;
}
@Override
@ -88,7 +90,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
ResourceManager resourceManager,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, storageManager));
try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) {
scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID());

View File

@ -139,7 +139,7 @@ public class LiveOnlyActivation extends Activation {
try {
scaleDownServerLocator = ScaleDownPolicy.getScaleDownConnector(scaleDownPolicy, activeMQServer);
//use a Node Locator to connect to the cluster
scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(scaleDownServerLocator));
scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(scaleDownServerLocator, activeMQServer.getStorageManager()));
LiveNodeLocator nodeLocator = scaleDownPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForScaleDown(activeMQServer) : new NamedLiveNodeLocatorForScaleDown(scaleDownPolicy.getGroupName(), activeMQServer);
scaleDownServerLocator.addClusterTopologyListener(nodeLocator);

View File

@ -25,7 +25,6 @@ import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.jboss.logging.Logger;
@ -95,18 +94,16 @@ public class EmbedMessageUtil {
}
private static Message readEncoded(ICoreMessage message, StorageManager storageManager, ActiveMQBuffer buffer) {
AbstractJournalStorageManager.setupThreadLocal(storageManager);
try {
Message returnMessage = MessagePersister.getInstance().decode(buffer, null, null);
Message returnMessage = MessagePersister.getInstance().decode(buffer, null, null, storageManager);
if (returnMessage instanceof LargeServerMessage) {
((LargeServerMessage)returnMessage).setStorageManager(storageManager);
}
returnMessage.setMessageID(message.getMessageID());
return returnMessage;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return message;
} finally {
AbstractJournalStorageManager.setupThreadLocal(null);
}
}

View File

@ -24,6 +24,8 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.persistence.PersisterIDs.MAX_PERSISTERS;
@ -105,11 +107,20 @@ public class MessagePersister implements Persister<Message> {
@Override
public Message decode(ActiveMQBuffer buffer, Message record, CoreMessageObjectPools pools) {
return decode(buffer, record, pools, null);
}
public Message decode(ActiveMQBuffer buffer, Message record, CoreMessageObjectPools pools, StorageManager storageManager) {
byte protocol = buffer.readByte();
Persister<Message> persister = getPersister(protocol);
if (persister == null) {
throw new NullPointerException("couldn't find factory for type=" + protocol);
}
return persister.decode(buffer, record, pools);
Message message = persister.decode(buffer, record, pools);
if (message instanceof LargeServerMessage) {
((LargeServerMessage) message).setStorageManager(storageManager);
}
return message;
}
}

View File

@ -126,7 +126,7 @@ public class ClusterControllerTest extends ClusterTestBase {
@Test
public void controlWithDifferentConnector() throws Exception {
try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, servers[0].getStorageManager()));
ClusterController controller = new ClusterController(getServer(0), getServer(0).getScheduledPool());
ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory());
clusterControl.authorize();
@ -136,7 +136,7 @@ public class ClusterControllerTest extends ClusterTestBase {
@Test
public void controlWithDifferentPassword() throws Exception {
try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, servers[0].getStorageManager()));
ClusterController controller = new ClusterController(getServer(1), getServer(1).getScheduledPool());
ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory());
try {

View File

@ -1396,7 +1396,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
setSessionFactoryCreateLocator(node, ha, serverTotc);
locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node]));
locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node], servers[node].getStorageManager()));
locators[node].setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
addServerLocator(locators[node]);

View File

@ -147,6 +147,10 @@
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
<instance>${basedir}/target/replicated-static0</instance>
<configuration>${basedir}/target/classes/servers/replicated-static0</configuration>
<args>
<arg>--java-options</arg>
<arg>-ea</arg>
</args>
</configuration>
</execution>
<execution>
@ -160,6 +164,10 @@
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
<instance>${basedir}/target/replicated-static1</instance>
<configuration>${basedir}/target/classes/servers/replicated-static1</configuration>
<args>
<arg>--java-options</arg>
<arg>-ea</arg>
</args>
</configuration>
</execution>
<execution>

View File

@ -115,6 +115,11 @@ public class SoakPagingTest extends SmokeTestBase {
public void produce(ConnectionFactory factory) {
try {
StringBuffer bufferlarge = new StringBuffer();
while (bufferlarge.length() < 110000) {
bufferlarge.append("asdflkajdhsf akljsdfh akljsdfh alksjdfh alkdjsf ");
}
Connection connection = factory.createConnection("admin", "admin");
connection.start();
@ -125,7 +130,13 @@ public class SoakPagingTest extends SmokeTestBase {
int i = 0;
while (true) {
Message message = session.createTextMessage("fkjdslkfjdskljf;lkdsjf;kdsajf;lkjdf;kdsajf;kjdsa;flkjdsa;lfkjdsa;flkj;dsakjf;dsajf;askjd;fkj;dsajflaskfja;fdlkajs;lfdkja;kfj;dsakfj;akdsjf;dsakjf;akfj;lakdsjf;lkasjdf;ksajf;kjdsa;fkj;adskjf;akdsjf;kja;sdkfj;akdsjf;akjdsf;adskjf;akdsjf;askfj;aksjfkdjafndmnfmdsnfjadshfjdsalkfjads;fkjdsa;kfja;skfj;akjfd;akjfd;ksaj;fkja;kfj;dsakjf;dsakjf;dksjf;akdsjf;kdsajf");
Message message;
if (i % 100 == 0) {
message = session.createTextMessage(bufferlarge.toString());
} else {
message = session.createTextMessage("fkjdslkfjdskljf;lkdsjf;kdsajf;lkjdf;kdsajf;kjdsa;flkjdsa;lfkjdsa;flkj;dsakjf;dsajf;askjd;fkj;dsajflaskfja;fdlkajs;lfdkja;kfj;dsakfj;akdsjf;dsakjf;akfj;lakdsjf;lkasjdf;ksajf;kjdsa;fkj;adskjf;akdsjf;kja;sdkfj;akdsjf;akjdsf;adskjf;akdsjf;askfj;aksjfkdjafndmnfmdsnfjadshfjdsalkfjads;fkjdsa;kfja;skfj;akjfd;akjfd;ksaj;fkja;kfj;dsakjf;dsakjf;dksjf;akdsjf;kdsajf");
}
messageProducer.send(message);
produced.incrementAndGet();