This commit is contained in:
Clebert Suconic 2017-06-29 10:14:07 -04:00
commit aa932141f8
11 changed files with 203 additions and 19 deletions

View File

@ -1565,7 +1565,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
if (largeServerMessage.getPendingRecordID() >= 0) {
try {
confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
largeServerMessage.setPendingRecordID(-1);
largeServerMessage.setPendingRecordID(LargeServerMessage.NO_PENDING_ID);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}

View File

@ -274,7 +274,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeMsgId);
}
if (replicator != null) {
replicator.largeMessageDelete(largeMsgId);
replicator.largeMessageDelete(largeMsgId, JournalStorageManager.this);
}
}
largeMessagesToDelete.clear();
@ -375,10 +375,17 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
journalFF.releaseBuffer(buffer);
}
public long storePendingLargeMessage(final long messageID) throws Exception {
public long storePendingLargeMessage(final long messageID, long recordID) throws Exception {
readLock();
try {
long recordID = generateID();
if (recordID == LargeServerMessage.NO_PENDING_ID) {
recordID = generateID();
} else {
//this means the large message doesn't
//have a pendingRecordID, but one has been
//generated (coming from live server) for use.
recordID = -recordID;
}
messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true));
@ -396,7 +403,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
// And the client won't be waiting for the actual file to be deleted.
// We set a temporary record (short lived) on the journal
// to avoid a situation where the server is restarted and pending large message stays on forever
largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID()));
largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID(), largeServerMessage.getPendingRecordID()));
} catch (Exception e) {
throw new ActiveMQInternalErrorException(e.getMessage(), e);
}
@ -427,7 +434,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
readLock();
try {
if (replicator != null) {
replicator.largeMessageDelete(largeServerMessage.getMessageID());
replicator.largeMessageDelete(largeServerMessage.getMessageID(), JournalStorageManager.this);
}
file.delete();
@ -475,7 +482,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
if (largeMessage.isDurable()) {
// We store a marker on the journal that the large file is pending
long pendingRecordID = storePendingLargeMessage(id);
long pendingRecordID = storePendingLargeMessage(id, LargeServerMessage.NO_PENDING_ID);
largeMessage.setPendingRecordID(pendingRecordID);
}

View File

@ -42,7 +42,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
private final JournalStorageManager storageManager;
private long pendingRecordID = -1;
private long pendingRecordID = NO_PENDING_ID;
private boolean paged;

View File

@ -158,4 +158,14 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
storageManager.addBytesToLargeMessage(appendFile, mainLM.getMessageID(), bytes);
}
@Override
public void setPendingRecordID(long pendingRecordID) {
mainLM.setPendingRecordID(pendingRecordID);
}
@Override
public long getPendingRecordID() {
return mainLM.getPendingRecordID();
}
}

View File

@ -23,31 +23,40 @@ import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationLargeMessageEndMessage extends PacketImpl {
long messageId;
long pendingRecordId;
public ReplicationLargeMessageEndMessage() {
super(PacketImpl.REPLICATION_LARGE_MESSAGE_END);
}
public ReplicationLargeMessageEndMessage(final long messageId) {
public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId) {
this();
this.messageId = messageId;
//we use negative value to indicate that this id is pre-generated by live node
//so that it won't be generated at backup.
//see https://issues.apache.org/jira/browse/ARTEMIS-1221
this.pendingRecordId = -pendingRecordId;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_LONG; // buffer.writeLong(messageId);
DataConstants.SIZE_LONG + // buffer.writeLong(messageId)
DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(messageId);
buffer.writeLong(pendingRecordId);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
messageId = buffer.readLong();
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
pendingRecordId = buffer.readLong();
}
}
/**
@ -85,4 +94,8 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
return false;
return true;
}
public long getPendingRecordId() {
return pendingRecordId;
}
}

View File

@ -52,4 +52,8 @@ public interface ReplicatedLargeMessage {
*/
void addBytes(byte[] body) throws Exception;
void setPendingRecordID(long pendingRecordID);
long getPendingRecordID();
}

View File

@ -479,6 +479,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
if (message != null) {
message.setPendingRecordID(packet.getPendingRecordId());
executor.execute(new Runnable() {
@Override
public void run() {

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
@ -241,9 +242,11 @@ public final class ReplicationManager implements ActiveMQComponent {
}
}
public void largeMessageDelete(final Long messageId) {
//we pass in storageManager to generate ID only if enabled
public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) {
if (enabled) {
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId));
long pendingRecordID = storageManager.generateID();
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID));
}
}

View File

@ -23,13 +23,11 @@ import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage {
long NO_PENDING_ID = -1;
@Override
void addBytes(byte[] bytes) throws Exception;
void setPendingRecordID(long pendingRecordID);
long getPendingRecordID();
/**
* We have to copy the large message content in case of DLQ and paged messages
* For that we need to pre-mark the LargeMessage with a flag when it is paged

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FailoverTestWithDivert extends FailoverTestBase {
private static final String DIVERT_ADDRESS = "jms.queue.testQueue";
private static final String DIVERT_FORWARD_ADDRESS = "jms.queue.divertedQueue";
private ClientSessionFactoryInternal sf;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
}
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return getNettyAcceptorTransportConfiguration(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
return getNettyConnectorTransportConfiguration(live);
}
@Override
protected void createConfigs() throws Exception {
createReplicatedConfigs();
liveConfig.setJournalFileSize(10240000);
backupConfig.setJournalFileSize(10240000);
addQueue(liveConfig, DIVERT_ADDRESS, DIVERT_ADDRESS);
addQueue(liveConfig, DIVERT_FORWARD_ADDRESS, DIVERT_FORWARD_ADDRESS);
addDivert(liveConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false);
addDivert(backupConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false);
}
private void addQueue(Configuration serverConfig, String address, String name) {
List<CoreAddressConfiguration> addrConfigs = serverConfig.getAddressConfigurations();
CoreAddressConfiguration addrCfg = new CoreAddressConfiguration();
addrCfg.setName(address);
addrCfg.addRoutingType(RoutingType.ANYCAST);
CoreQueueConfiguration qConfig = new CoreQueueConfiguration();
qConfig.setName(name);
qConfig.setAddress(address);
addrCfg.addQueueConfiguration(qConfig);
addrConfigs.add(addrCfg);
}
private void addDivert(Configuration serverConfig, String source, String target, boolean exclusive) {
List<DivertConfiguration> divertConfigs = serverConfig.getDivertConfigurations();
DivertConfiguration newDivert = new DivertConfiguration();
newDivert.setName("myDivert");
newDivert.setAddress(source);
newDivert.setForwardingAddress(target);
newDivert.setExclusive(exclusive);
divertConfigs.add(newDivert);
}
@Test
public void testUniqueIDsWithDivert() throws Exception {
Map<String, Object> params = new HashMap<>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
TransportConfiguration tc = createTransportConfiguration(true, false, params);
ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(-1);
sf = createSessionFactoryAndWaitForTopology(locator, 2);
int minLarge = locator.getMinLargeMessageSize();
ClientSession session = sf.createSession(false, false);
addClientSession(session);
session.start();
final int num = 100;
ClientProducer producer = session.createProducer(DIVERT_ADDRESS);
for (int i = 0; i < num; i++) {
ClientMessage message = createLargeMessage(session, 2 * minLarge);
producer.send(message);
}
session.commit();
ClientConsumer consumer = session.createConsumer(DIVERT_ADDRESS);
for (int i = 0; i < num; i++) {
ClientMessage receivedFromSourceQueue = consumer.receive(5000);
assertNotNull(receivedFromSourceQueue);
receivedFromSourceQueue.acknowledge();
}
session.commit();
crash(session);
ClientConsumer consumer1 = session.createConsumer(DIVERT_FORWARD_ADDRESS);
for (int i = 0; i < num; i++) {
ClientMessage receivedFromTargetQueue = consumer1.receive(5000);
assertNotNull(receivedFromTargetQueue);
receivedFromTargetQueue.acknowledge();
}
session.commit();
}
private ClientMessage createLargeMessage(ClientSession session, final int largeSize) {
ClientMessage message = session.createMessage(true);
ActiveMQBuffer bodyBuffer = message.getBodyBuffer();
final int propSize = 10240;
while (bodyBuffer.writerIndex() < largeSize) {
byte[] prop = new byte[propSize];
bodyBuffer.writeBytes(prop);
}
return message;
}
}

View File

@ -204,7 +204,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
public void testSendPackets() throws Exception {
setupServer(true);
StorageManager storage = getStorage();
JournalStorageManager storage = getStorage();
manager = liveServer.getReplicationManager();
waitForComponent(manager);
@ -270,7 +270,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
manager.largeMessageWrite(500, new byte[1024]);
manager.largeMessageDelete(Long.valueOf(500));
manager.largeMessageDelete(Long.valueOf(500), storage);
blockOnReplication(storage, manager);