From 17db696e8b43ea461b8cbaa6010538f2e20af549 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Thu, 1 Feb 2018 22:36:57 +0800 Subject: [PATCH 1/2] ARTEMIS-1652 XmlDataImporter forgets delete tmp files When using the tool to import more than one large messages from xml exported file, this utility class will create some tmp files, each for one large message. However it only delete one of the tmp files. All the rest of tmp files won't get cleaned up. --- .../commands/tools/xml/XmlDataImporter.java | 13 +-- .../persistence/XmlImportExportTest.java | 87 +++++++++++++++++++ 2 files changed, 94 insertions(+), 6 deletions(-) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java index 1f4062efd2..102cf5ab6b 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java @@ -273,7 +273,7 @@ public final class XmlDataImporter extends ActionAbstract { if (sort) { for (MessageTemp msgtmp : messages) { - sendMessage(msgtmp.queues, msgtmp.message); + sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFileName); } } @@ -354,9 +354,9 @@ public final class XmlDataImporter extends ActionAbstract { } if (sort) { - messages.add(new MessageTemp(id, queues, message)); + messages.add(new MessageTemp(id, queues, message, tempFileName)); } else { - sendMessage(queues, message); + sendMessage(queues, message, tempFileName); } } @@ -365,12 +365,14 @@ public final class XmlDataImporter extends ActionAbstract { long id; List queues; Message message; + String tempFileName; - MessageTemp(long id, List queues, Message message) { + MessageTemp(long id, List queues, Message message, String tempFileName) { this.message = message; this.queues = queues; this.message = message; this.id = id; + this.tempFileName = tempFileName; } } @@ -399,7 +401,7 @@ public final class XmlDataImporter extends ActionAbstract { return type; } - private void sendMessage(List queues, Message message) throws Exception { + private void sendMessage(List queues, Message message, String tempFileName) throws Exception { StringBuilder logMessage = new StringBuilder(); String destination = addressMap.get(queues.get(0)); @@ -451,7 +453,6 @@ public final class XmlDataImporter extends ActionAbstract { if (!tempFile.delete()) { ActiveMQServerLogger.LOGGER.couldNotDeleteTempFile(tempFileName); } - tempFileName = ""; } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java index a041619d84..3ef23800c6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java @@ -25,6 +25,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.File; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; @@ -520,6 +521,92 @@ public class XmlImportExportTest extends ActiveMQTestBase { session.commit(); } + @Test + public void testLargeMessagesNoTmpFiles() throws Exception { + server = createServer(true); + server.start(); + locator = createInVMNonHALocator(); + factory = createSessionFactory(locator); + ClientSession session = factory.createSession(false, false); + + LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager) server.getStorageManager()); + + fileMessage.setMessageID(1005); + fileMessage.setDurable(true); + + for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) { + fileMessage.addBytes(new byte[]{getSamplebyte(i)}); + } + + fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + + fileMessage.releaseResources(); + + session.createQueue("A", RoutingType.MULTICAST, "A", true); + + ClientProducer prod = session.createProducer("A"); + + prod.send(fileMessage); + prod.send(fileMessage); + + fileMessage.deleteFile(); + + session.commit(); + + session.close(); + locator.close(); + server.stop(); + + ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream(); + XmlDataExporter xmlDataExporter = new XmlDataExporter(); + xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory()); + System.out.print(new String(xmlOutputStream.toByteArray())); + + clearDataRecreateServerDirs(); + server.start(); + checkForLongs(); + locator = createInVMNonHALocator(); + factory = createSessionFactory(locator); + session = factory.createSession(false, true, true); + + ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); + XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.sort = true; + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); + xmlDataImporter.process(xmlInputStream, session); + session.close(); + session = factory.createSession(false, false); + session.start(); + + ClientConsumer cons = session.createConsumer("A"); + + ClientMessage msg = cons.receive(CONSUMER_TIMEOUT); + assertNotNull(msg); + assertEquals(2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize()); + + for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) { + assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte()); + } + msg = cons.receive(CONSUMER_TIMEOUT); + assertNotNull(msg); + assertEquals(2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize()); + + for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) { + assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte()); + } + + msg.acknowledge(); + session.commit(); + + //make sure there is not tmp file left + File workingDir = new File(System.getProperty("user.dir")); + String[] flist = workingDir.list(); + for (String fn : flist) { + assertFalse("leftover: " + fn, fn.endsWith(".tmp")); + } + } + @Test public void testLargeJmsTextMessage() throws Exception { basicSetUp(); From b2a71d27301f30e15f39bf425346c9b134dc39f0 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 2 Feb 2018 11:32:58 -0500 Subject: [PATCH 2/2] ARTEMIS-1652 Improving delete logic on XmlDataImporter Before this the test would use a string and a temporary file on the user's folder. After this the test will use a temporary file with the proper File.createTemporaryFile method. --- .../commands/tools/xml/XmlDataImporter.java | 39 +++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java index 102cf5ab6b..595ed557b0 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java @@ -26,6 +26,7 @@ import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; import javax.xml.validation.Validator; import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -43,7 +44,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeSet; -import java.util.UUID; import io.airlift.airline.Command; import io.airlift.airline.Option; @@ -95,8 +95,6 @@ public final class XmlDataImporter extends ActionAbstract { final Map queueIDs = new HashMap<>(); - String tempFileName = ""; - HashMap oldPrefixTranslation = new HashMap<>(); private ClientSession session; @@ -328,13 +326,14 @@ public final class XmlDataImporter extends ActionAbstract { boolean endLoop = false; + File largeMessageTemporaryFile = null; // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.) while (reader.hasNext()) { int eventType = reader.getEventType(); switch (eventType) { case XMLStreamConstants.START_ELEMENT: if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) { - processMessageBody(message.toCore()); + largeMessageTemporaryFile = processMessageBody(message.toCore()); } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) { processMessageProperties(message); } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) { @@ -354,9 +353,9 @@ public final class XmlDataImporter extends ActionAbstract { } if (sort) { - messages.add(new MessageTemp(id, queues, message, tempFileName)); + messages.add(new MessageTemp(id, queues, message, largeMessageTemporaryFile)); } else { - sendMessage(queues, message, tempFileName); + sendMessage(queues, message, largeMessageTemporaryFile); } } @@ -365,9 +364,9 @@ public final class XmlDataImporter extends ActionAbstract { long id; List queues; Message message; - String tempFileName; + File tempFileName; - MessageTemp(long id, List queues, Message message, String tempFileName) { + MessageTemp(long id, List queues, Message message, File tempFileName) { this.message = message; this.queues = queues; this.message = message; @@ -401,7 +400,7 @@ public final class XmlDataImporter extends ActionAbstract { return type; } - private void sendMessage(List queues, Message message, String tempFileName) throws Exception { + private void sendMessage(List queues, Message message, File tempFileName) throws Exception { StringBuilder logMessage = new StringBuilder(); String destination = addressMap.get(queues.get(0)); @@ -448,10 +447,15 @@ public final class XmlDataImporter extends ActionAbstract { producer.send(message); } - if (tempFileName.length() > 0) { - File tempFile = new File(tempFileName); - if (!tempFile.delete()) { - ActiveMQServerLogger.LOGGER.couldNotDeleteTempFile(tempFileName); + if (tempFileName != null) { + try { + // this is to make sure the large message is sent before we delete it + // to avoid races + session.commit(); + } catch (Throwable dontcare) { + } + if (!tempFileName.delete()) { + ActiveMQServerLogger.LOGGER.couldNotDeleteTempFile(tempFileName.getAbsolutePath()); } } } @@ -532,7 +536,8 @@ public final class XmlDataImporter extends ActionAbstract { } } - private void processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException { + private File processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException { + File tempFileName = null; boolean isLarge = false; for (int i = 0; i < reader.getAttributeCount(); i++) { @@ -546,11 +551,11 @@ public final class XmlDataImporter extends ActionAbstract { logger.debug("XMLStreamReader impl: " + reader); } if (isLarge) { - tempFileName = UUID.randomUUID().toString() + ".tmp"; + tempFileName = File.createTempFile("largeMessage", ".tmp"); if (logger.isDebugEnabled()) { logger.debug("Creating temp file " + tempFileName + " for large message."); } - try (OutputStream out = new FileOutputStream(tempFileName)) { + try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) { getMessageBodyBytes(new MessageBodyBytesProcessor() { @Override public void processBodyBytes(byte[] bytes) throws IOException { @@ -569,6 +574,8 @@ public final class XmlDataImporter extends ActionAbstract { } }); } + + return tempFileName; } /**