diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java index 1f4062efd2..595ed557b0 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java @@ -26,6 +26,7 @@ import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; import javax.xml.validation.Validator; import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -43,7 +44,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeSet; -import java.util.UUID; import io.airlift.airline.Command; import io.airlift.airline.Option; @@ -95,8 +95,6 @@ public final class XmlDataImporter extends ActionAbstract { final Map queueIDs = new HashMap<>(); - String tempFileName = ""; - HashMap oldPrefixTranslation = new HashMap<>(); private ClientSession session; @@ -273,7 +271,7 @@ public final class XmlDataImporter extends ActionAbstract { if (sort) { for (MessageTemp msgtmp : messages) { - sendMessage(msgtmp.queues, msgtmp.message); + sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFileName); } } @@ -328,13 +326,14 @@ public final class XmlDataImporter extends ActionAbstract { boolean endLoop = false; + File largeMessageTemporaryFile = null; // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.) while (reader.hasNext()) { int eventType = reader.getEventType(); switch (eventType) { case XMLStreamConstants.START_ELEMENT: if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) { - processMessageBody(message.toCore()); + largeMessageTemporaryFile = processMessageBody(message.toCore()); } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) { processMessageProperties(message); } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) { @@ -354,9 +353,9 @@ public final class XmlDataImporter extends ActionAbstract { } if (sort) { - messages.add(new MessageTemp(id, queues, message)); + messages.add(new MessageTemp(id, queues, message, largeMessageTemporaryFile)); } else { - sendMessage(queues, message); + sendMessage(queues, message, largeMessageTemporaryFile); } } @@ -365,12 +364,14 @@ public final class XmlDataImporter extends ActionAbstract { long id; List queues; Message message; + File tempFileName; - MessageTemp(long id, List queues, Message message) { + MessageTemp(long id, List queues, Message message, File tempFileName) { this.message = message; this.queues = queues; this.message = message; this.id = id; + this.tempFileName = tempFileName; } } @@ -399,7 +400,7 @@ public final class XmlDataImporter extends ActionAbstract { return type; } - private void sendMessage(List queues, Message message) throws Exception { + private void sendMessage(List queues, Message message, File tempFileName) throws Exception { StringBuilder logMessage = new StringBuilder(); String destination = addressMap.get(queues.get(0)); @@ -446,12 +447,16 @@ public final class XmlDataImporter extends ActionAbstract { producer.send(message); } - if (tempFileName.length() > 0) { - File tempFile = new File(tempFileName); - if (!tempFile.delete()) { - ActiveMQServerLogger.LOGGER.couldNotDeleteTempFile(tempFileName); + if (tempFileName != null) { + try { + // this is to make sure the large message is sent before we delete it + // to avoid races + session.commit(); + } catch (Throwable dontcare) { + } + if (!tempFileName.delete()) { + ActiveMQServerLogger.LOGGER.couldNotDeleteTempFile(tempFileName.getAbsolutePath()); } - tempFileName = ""; } } @@ -531,7 +536,8 @@ public final class XmlDataImporter extends ActionAbstract { } } - private void processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException { + private File processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException { + File tempFileName = null; boolean isLarge = false; for (int i = 0; i < reader.getAttributeCount(); i++) { @@ -545,11 +551,11 @@ public final class XmlDataImporter extends ActionAbstract { logger.debug("XMLStreamReader impl: " + reader); } if (isLarge) { - tempFileName = UUID.randomUUID().toString() + ".tmp"; + tempFileName = File.createTempFile("largeMessage", ".tmp"); if (logger.isDebugEnabled()) { logger.debug("Creating temp file " + tempFileName + " for large message."); } - try (OutputStream out = new FileOutputStream(tempFileName)) { + try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) { getMessageBodyBytes(new MessageBodyBytesProcessor() { @Override public void processBodyBytes(byte[] bytes) throws IOException { @@ -568,6 +574,8 @@ public final class XmlDataImporter extends ActionAbstract { } }); } + + return tempFileName; } /** diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java index a041619d84..3ef23800c6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java @@ -25,6 +25,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.File; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; @@ -520,6 +521,92 @@ public class XmlImportExportTest extends ActiveMQTestBase { session.commit(); } + @Test + public void testLargeMessagesNoTmpFiles() throws Exception { + server = createServer(true); + server.start(); + locator = createInVMNonHALocator(); + factory = createSessionFactory(locator); + ClientSession session = factory.createSession(false, false); + + LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager) server.getStorageManager()); + + fileMessage.setMessageID(1005); + fileMessage.setDurable(true); + + for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) { + fileMessage.addBytes(new byte[]{getSamplebyte(i)}); + } + + fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + + fileMessage.releaseResources(); + + session.createQueue("A", RoutingType.MULTICAST, "A", true); + + ClientProducer prod = session.createProducer("A"); + + prod.send(fileMessage); + prod.send(fileMessage); + + fileMessage.deleteFile(); + + session.commit(); + + session.close(); + locator.close(); + server.stop(); + + ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream(); + XmlDataExporter xmlDataExporter = new XmlDataExporter(); + xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory()); + System.out.print(new String(xmlOutputStream.toByteArray())); + + clearDataRecreateServerDirs(); + server.start(); + checkForLongs(); + locator = createInVMNonHALocator(); + factory = createSessionFactory(locator); + session = factory.createSession(false, true, true); + + ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); + XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.sort = true; + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); + xmlDataImporter.process(xmlInputStream, session); + session.close(); + session = factory.createSession(false, false); + session.start(); + + ClientConsumer cons = session.createConsumer("A"); + + ClientMessage msg = cons.receive(CONSUMER_TIMEOUT); + assertNotNull(msg); + assertEquals(2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize()); + + for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) { + assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte()); + } + msg = cons.receive(CONSUMER_TIMEOUT); + assertNotNull(msg); + assertEquals(2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize()); + + for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) { + assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte()); + } + + msg.acknowledge(); + session.commit(); + + //make sure there is not tmp file left + File workingDir = new File(System.getProperty("user.dir")); + String[] flist = workingDir.list(); + for (String fn : flist) { + assertFalse("leftover: " + fn, fn.endsWith(".tmp")); + } + } + @Test public void testLargeJmsTextMessage() throws Exception { basicSetUp();