This closes #471 ARTEMIS-471 Fixing Divert on LargeMessages and Replication
This commit is contained in:
commit
31e4e7d328
|
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
||||||
|
import org.apache.activemq.artemis.core.io.util.FileIOUtil;
|
||||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||||
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
||||||
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
|
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
|
||||||
|
@ -113,14 +114,7 @@ public abstract class AbstractSequentialFile implements SequentialFile {
|
||||||
|
|
||||||
ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
|
ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
|
||||||
|
|
||||||
for (;;) {
|
FileIOUtil.copyData(this, newFileName, buffer);
|
||||||
buffer.rewind();
|
|
||||||
int size = this.read(buffer);
|
|
||||||
newFileName.writeDirect(buffer, false);
|
|
||||||
if (size < 10 * 1024) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
newFileName.close();
|
newFileName.close();
|
||||||
this.close();
|
this.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -264,51 +264,63 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized ServerMessage copy() {
|
public ServerMessage copy() {
|
||||||
SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable);
|
SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable);
|
||||||
|
|
||||||
ServerMessage newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID);
|
ServerMessage newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID);
|
||||||
return newMessage;
|
return newMessage;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void copyFrom(final SequentialFile fileSource) throws Exception {
|
|
||||||
this.bodySize = -1;
|
|
||||||
this.pendingCopy = fileSource;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void finishCopy() throws Exception {
|
public ServerMessage copy(final long newID) {
|
||||||
if (pendingCopy != null) {
|
|
||||||
SequentialFile copyTo = createFile();
|
|
||||||
try {
|
|
||||||
this.pendingRecordID = storageManager.storePendingLargeMessage(this.messageID);
|
|
||||||
copyTo.open();
|
|
||||||
pendingCopy.open();
|
|
||||||
pendingCopy.copyTo(copyTo);
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
copyTo.close();
|
|
||||||
pendingCopy.close();
|
|
||||||
pendingCopy = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
closeFile();
|
|
||||||
bodySize = -1;
|
|
||||||
file = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The copy of the file itself will be done later by {@link LargeServerMessageImpl#finishCopy()}
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public synchronized ServerMessage copy(final long newID) {
|
|
||||||
try {
|
try {
|
||||||
SequentialFile newfile = storageManager.createFileForLargeMessage(newID, durable);
|
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
|
||||||
|
|
||||||
|
|
||||||
|
byte[] bufferBytes = new byte[100 * 1024];
|
||||||
|
|
||||||
|
ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
|
||||||
|
|
||||||
|
long oldPosition = file.position();
|
||||||
|
|
||||||
|
boolean originallyOpen = file.isOpen();
|
||||||
|
file.open();
|
||||||
|
file.position(0);
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
// The buffer is reused...
|
||||||
|
// We need to make sure we clear the limits and the buffer before reusing it
|
||||||
|
buffer.clear();
|
||||||
|
int bytesRead = file.read(buffer);
|
||||||
|
|
||||||
|
byte[] bufferToWrite;
|
||||||
|
if (bytesRead == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (bytesRead == bufferBytes.length) {
|
||||||
|
bufferToWrite = bufferBytes;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
bufferToWrite = new byte[bytesRead];
|
||||||
|
System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead);
|
||||||
|
}
|
||||||
|
|
||||||
|
newMessage.addBytes(bufferToWrite);
|
||||||
|
|
||||||
|
if (bytesRead < bufferBytes.length) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
file.position(oldPosition);
|
||||||
|
|
||||||
|
if (!originallyOpen) {
|
||||||
|
file.close();
|
||||||
|
}
|
||||||
|
|
||||||
LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newfile, newID);
|
|
||||||
newMessage.copyFrom(createFile());
|
|
||||||
return newMessage;
|
return newMessage;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
ActiveMQServerLogger.LOGGER.lareMessageErrorCopying(e, this);
|
ActiveMQServerLogger.LOGGER.lareMessageErrorCopying(e, this);
|
||||||
|
|
|
@ -48,8 +48,6 @@ public interface ServerMessage extends MessageInternal, EncodingSupport {
|
||||||
|
|
||||||
ServerMessage copy(long newID);
|
ServerMessage copy(long newID);
|
||||||
|
|
||||||
void finishCopy() throws Exception;
|
|
||||||
|
|
||||||
ServerMessage copy();
|
ServerMessage copy();
|
||||||
|
|
||||||
int getMemoryEstimate();
|
int getMemoryEstimate();
|
||||||
|
|
|
@ -148,7 +148,6 @@ public class Redistributor implements Consumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!reference.getMessage().isLargeMessage()) {
|
if (!reference.getMessage().isLargeMessage()) {
|
||||||
routingInfo.getB().finishCopy();
|
|
||||||
|
|
||||||
postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
|
postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
|
||||||
|
|
||||||
|
@ -160,7 +159,6 @@ public class Redistributor implements Consumer {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
routingInfo.getB().finishCopy();
|
|
||||||
|
|
||||||
postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
|
postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
|
||||||
|
|
||||||
|
|
|
@ -88,7 +88,6 @@ public class DivertImpl implements Divert {
|
||||||
// Shouldn't copy if it's not routed anywhere else
|
// Shouldn't copy if it's not routed anywhere else
|
||||||
if (!forwardAddress.equals(message.getAddress())) {
|
if (!forwardAddress.equals(message.getAddress())) {
|
||||||
copy = message.copy(id);
|
copy = message.copy(id);
|
||||||
copy.finishCopy();
|
|
||||||
|
|
||||||
// This will set the original MessageId, and the original address
|
// This will set the original MessageId, and the original address
|
||||||
copy.setOriginalHeaders(message, null, false);
|
copy.setOriginalHeaders(message, null, false);
|
||||||
|
|
|
@ -2222,7 +2222,6 @@ public class QueueImpl implements Queue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
copyMessage.finishCopy();
|
|
||||||
postOffice.processRoute(copyMessage, routingContext, false);
|
postOffice.processRoute(copyMessage, routingContext, false);
|
||||||
|
|
||||||
ref.handled();
|
ref.handled();
|
||||||
|
|
|
@ -185,10 +185,6 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage {
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void finishCopy() throws Exception {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServerMessage copy() {
|
public ServerMessage copy() {
|
||||||
// This is a simple copy, used only to avoid changing original properties
|
// This is a simple copy, used only to avoid changing original properties
|
||||||
|
@ -216,7 +212,6 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
ServerMessage copy = copy(newID);
|
ServerMessage copy = copy(newID);
|
||||||
copy.finishCopy();
|
|
||||||
|
|
||||||
if (copyOriginalHeaders) {
|
if (copyOriginalHeaders) {
|
||||||
copy.setOriginalHeaders(this, originalReference, expiry);
|
copy.setOriginalHeaders(this, originalReference, expiry);
|
||||||
|
|
|
@ -336,11 +336,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void finishCopy() throws Exception {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServerMessage copy() {
|
public ServerMessage copy() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -0,0 +1,258 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.divert;
|
||||||
|
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MapMessage;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
|
||||||
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
|
||||||
|
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
|
||||||
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class ReplicationWithDivertTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
public static final String JMS_SOURCE_QUEUE = "Queue";
|
||||||
|
public static final String SOURCE_QUEUE = "jms.queue." + JMS_SOURCE_QUEUE;
|
||||||
|
public static final String JMS_TARGET_QUEUE = "DestQueue";
|
||||||
|
public static final String TARGET_QUEUE = "jms.queue." + JMS_TARGET_QUEUE;
|
||||||
|
public static int messageChunkCount = 0;
|
||||||
|
|
||||||
|
private static ActiveMQServer backupServer;
|
||||||
|
private static ActiveMQServer liveServer;
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000");
|
||||||
|
ActiveMQConnection connection;
|
||||||
|
Session session;
|
||||||
|
Queue queue;
|
||||||
|
Queue targetQueue;
|
||||||
|
MessageProducer producer;
|
||||||
|
|
||||||
|
Configuration backupConfig;
|
||||||
|
|
||||||
|
Configuration liveConfig;
|
||||||
|
|
||||||
|
// To inform the main thread the condition is met
|
||||||
|
static final ReusableLatch flagChunkEntered = new ReusableLatch(1);
|
||||||
|
// To wait while the condition is worked out
|
||||||
|
static final ReusableLatch flagChunkWait = new ReusableLatch(1);
|
||||||
|
|
||||||
|
// To inform the main thread the condition is met
|
||||||
|
static final ReusableLatch flagSyncEntered = new ReusableLatch(1);
|
||||||
|
// To wait while the condition is worked out
|
||||||
|
static final ReusableLatch flagSyncWait = new ReusableLatch(1);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
System.out.println("Tmp::" + getTemporaryDir());
|
||||||
|
|
||||||
|
flagChunkEntered.setCount(1);
|
||||||
|
flagChunkWait.setCount(1);
|
||||||
|
|
||||||
|
flagSyncEntered.setCount(1);
|
||||||
|
flagSyncWait.setCount(1);
|
||||||
|
|
||||||
|
messageChunkCount = 0;
|
||||||
|
|
||||||
|
TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
|
||||||
|
TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
|
||||||
|
TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
|
||||||
|
TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
|
||||||
|
|
||||||
|
backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)).
|
||||||
|
setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).
|
||||||
|
setLargeMessagesDirectory(getLargeMessagesDir(0, true));
|
||||||
|
backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE));
|
||||||
|
backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE));
|
||||||
|
|
||||||
|
|
||||||
|
DivertConfiguration divertConfiguration = new DivertConfiguration().setName("Test").setAddress(SOURCE_QUEUE).setForwardingAddress(TARGET_QUEUE).setRoutingName("Test");
|
||||||
|
|
||||||
|
|
||||||
|
liveConfig = createDefaultInVMConfig();
|
||||||
|
liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE).setDurable(true));
|
||||||
|
liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE).setDurable(true));
|
||||||
|
liveConfig.addDivertConfiguration(divertConfiguration);
|
||||||
|
|
||||||
|
backupConfig.addDivertConfiguration(divertConfiguration);
|
||||||
|
|
||||||
|
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);
|
||||||
|
|
||||||
|
liveServer = createServer(liveConfig);
|
||||||
|
liveServer.start();
|
||||||
|
|
||||||
|
startBackup();
|
||||||
|
|
||||||
|
waitForServerToStart(liveServer);
|
||||||
|
|
||||||
|
// Just to make sure the expression worked
|
||||||
|
Assert.assertEquals(10000, factory.getMinLargeMessageSize());
|
||||||
|
Assert.assertEquals(10000, factory.getProducerWindowSize());
|
||||||
|
Assert.assertEquals(100, factory.getRetryInterval());
|
||||||
|
Assert.assertEquals(-1, factory.getReconnectAttempts());
|
||||||
|
Assert.assertTrue(factory.isHA());
|
||||||
|
|
||||||
|
connection = (ActiveMQConnection) factory.createConnection();
|
||||||
|
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
queue = session.createQueue(JMS_SOURCE_QUEUE);
|
||||||
|
targetQueue = session.createQueue(JMS_TARGET_QUEUE);
|
||||||
|
|
||||||
|
producer = session.createProducer(queue);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startBackup() throws Exception {
|
||||||
|
backupServer = createServer(backupConfig);
|
||||||
|
backupServer.start();
|
||||||
|
|
||||||
|
waitForServerToStart(backupServer);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void stopServers() throws Exception {
|
||||||
|
if (connection != null) {
|
||||||
|
try {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (backupServer != null) {
|
||||||
|
backupServer.stop();
|
||||||
|
backupServer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (liveServer != null) {
|
||||||
|
liveServer.stop();
|
||||||
|
liveServer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
backupServer = liveServer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendLargeMessage() throws Exception {
|
||||||
|
|
||||||
|
final CountDownLatch failedOver = new CountDownLatch(1);
|
||||||
|
connection.setFailoverListener(new FailoverEventListener() {
|
||||||
|
@Override
|
||||||
|
public void failoverEvent(FailoverEventType eventType) {
|
||||||
|
failedOver.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Thread t;
|
||||||
|
|
||||||
|
final int numberOfMessage = 5;
|
||||||
|
{
|
||||||
|
final MapMessage message = createLargeMessage();
|
||||||
|
|
||||||
|
t = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < numberOfMessage; i++) {
|
||||||
|
producer.send(message);
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (JMSException expected) {
|
||||||
|
expected.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
t.start();
|
||||||
|
|
||||||
|
t.join(10000);
|
||||||
|
|
||||||
|
{
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
for (int msgi = 0; msgi < numberOfMessage; msgi++) {
|
||||||
|
MapMessage message = (MapMessage) consumer.receive(5000);
|
||||||
|
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
Assert.assertEquals(200 * 1024, message.getBytes("test" + i).length);
|
||||||
|
}
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertFalse(t.isAlive());
|
||||||
|
liveServer.stop(true);
|
||||||
|
Assert.assertTrue(failedOver.await(10, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
{
|
||||||
|
MessageConsumer consumer = session.createConsumer(targetQueue);
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
for (int msgi = 0; msgi < numberOfMessage; msgi++) {
|
||||||
|
MapMessage message = (MapMessage) consumer.receive(5000);
|
||||||
|
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
Assert.assertEquals(200 * 1024, message.getBytes("test" + i).length);
|
||||||
|
}
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MapMessage createLargeMessage() throws JMSException {
|
||||||
|
MapMessage message = session.createMapMessage();
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
message.setBytes("test" + i, new byte[200 * 1024]);
|
||||||
|
}
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue