This commit is contained in:
Clebert Suconic 2018-02-02 12:12:49 -05:00
commit 2a72923e8c
2 changed files with 112 additions and 17 deletions

View File

@ -26,6 +26,7 @@ import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory; import javax.xml.validation.SchemaFactory;
import javax.xml.validation.Validator; import javax.xml.validation.Validator;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@ -43,7 +44,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.UUID;
import io.airlift.airline.Command; import io.airlift.airline.Command;
import io.airlift.airline.Option; import io.airlift.airline.Option;
@ -95,8 +95,6 @@ public final class XmlDataImporter extends ActionAbstract {
final Map<String, Long> queueIDs = new HashMap<>(); final Map<String, Long> queueIDs = new HashMap<>();
String tempFileName = "";
HashMap<String, String> oldPrefixTranslation = new HashMap<>(); HashMap<String, String> oldPrefixTranslation = new HashMap<>();
private ClientSession session; private ClientSession session;
@ -273,7 +271,7 @@ public final class XmlDataImporter extends ActionAbstract {
if (sort) { if (sort) {
for (MessageTemp msgtmp : messages) { for (MessageTemp msgtmp : messages) {
sendMessage(msgtmp.queues, msgtmp.message); sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFileName);
} }
} }
@ -328,13 +326,14 @@ public final class XmlDataImporter extends ActionAbstract {
boolean endLoop = false; boolean endLoop = false;
File largeMessageTemporaryFile = null;
// loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.) // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
while (reader.hasNext()) { while (reader.hasNext()) {
int eventType = reader.getEventType(); int eventType = reader.getEventType();
switch (eventType) { switch (eventType) {
case XMLStreamConstants.START_ELEMENT: case XMLStreamConstants.START_ELEMENT:
if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) { if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
processMessageBody(message.toCore()); largeMessageTemporaryFile = processMessageBody(message.toCore());
} else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) { } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
processMessageProperties(message); processMessageProperties(message);
} else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) { } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
@ -354,9 +353,9 @@ public final class XmlDataImporter extends ActionAbstract {
} }
if (sort) { if (sort) {
messages.add(new MessageTemp(id, queues, message)); messages.add(new MessageTemp(id, queues, message, largeMessageTemporaryFile));
} else { } else {
sendMessage(queues, message); sendMessage(queues, message, largeMessageTemporaryFile);
} }
} }
@ -365,12 +364,14 @@ public final class XmlDataImporter extends ActionAbstract {
long id; long id;
List<String> queues; List<String> queues;
Message message; Message message;
File tempFileName;
MessageTemp(long id, List<String> queues, Message message) { MessageTemp(long id, List<String> queues, Message message, File tempFileName) {
this.message = message; this.message = message;
this.queues = queues; this.queues = queues;
this.message = message; this.message = message;
this.id = id; this.id = id;
this.tempFileName = tempFileName;
} }
} }
@ -399,7 +400,7 @@ public final class XmlDataImporter extends ActionAbstract {
return type; return type;
} }
private void sendMessage(List<String> queues, Message message) throws Exception { private void sendMessage(List<String> queues, Message message, File tempFileName) throws Exception {
StringBuilder logMessage = new StringBuilder(); StringBuilder logMessage = new StringBuilder();
String destination = addressMap.get(queues.get(0)); String destination = addressMap.get(queues.get(0));
@ -446,12 +447,16 @@ public final class XmlDataImporter extends ActionAbstract {
producer.send(message); producer.send(message);
} }
if (tempFileName.length() > 0) { if (tempFileName != null) {
File tempFile = new File(tempFileName); try {
if (!tempFile.delete()) { // this is to make sure the large message is sent before we delete it
ActiveMQServerLogger.LOGGER.couldNotDeleteTempFile(tempFileName); // to avoid races
session.commit();
} catch (Throwable dontcare) {
}
if (!tempFileName.delete()) {
ActiveMQServerLogger.LOGGER.couldNotDeleteTempFile(tempFileName.getAbsolutePath());
} }
tempFileName = "";
} }
} }
@ -531,7 +536,8 @@ public final class XmlDataImporter extends ActionAbstract {
} }
} }
private void processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException { private File processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException {
File tempFileName = null;
boolean isLarge = false; boolean isLarge = false;
for (int i = 0; i < reader.getAttributeCount(); i++) { for (int i = 0; i < reader.getAttributeCount(); i++) {
@ -545,11 +551,11 @@ public final class XmlDataImporter extends ActionAbstract {
logger.debug("XMLStreamReader impl: " + reader); logger.debug("XMLStreamReader impl: " + reader);
} }
if (isLarge) { if (isLarge) {
tempFileName = UUID.randomUUID().toString() + ".tmp"; tempFileName = File.createTempFile("largeMessage", ".tmp");
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Creating temp file " + tempFileName + " for large message."); logger.debug("Creating temp file " + tempFileName + " for large message.");
} }
try (OutputStream out = new FileOutputStream(tempFileName)) { try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) {
getMessageBodyBytes(new MessageBodyBytesProcessor() { getMessageBodyBytes(new MessageBodyBytesProcessor() {
@Override @Override
public void processBodyBytes(byte[] bytes) throws IOException { public void processBodyBytes(byte[] bytes) throws IOException {
@ -568,6 +574,8 @@ public final class XmlDataImporter extends ActionAbstract {
} }
}); });
} }
return tempFileName;
} }
/** /**

View File

@ -25,6 +25,7 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
@ -520,6 +521,92 @@ public class XmlImportExportTest extends ActiveMQTestBase {
session.commit(); session.commit();
} }
@Test
public void testLargeMessagesNoTmpFiles() throws Exception {
server = createServer(true);
server.start();
locator = createInVMNonHALocator();
factory = createSessionFactory(locator);
ClientSession session = factory.createSession(false, false);
LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager) server.getStorageManager());
fileMessage.setMessageID(1005);
fileMessage.setDurable(true);
for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) {
fileMessage.addBytes(new byte[]{getSamplebyte(i)});
}
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
fileMessage.releaseResources();
session.createQueue("A", RoutingType.MULTICAST, "A", true);
ClientProducer prod = session.createProducer("A");
prod.send(fileMessage);
prod.send(fileMessage);
fileMessage.deleteFile();
session.commit();
session.close();
locator.close();
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
server.start();
checkForLongs();
locator = createInVMNonHALocator();
factory = createSessionFactory(locator);
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.sort = true;
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
session.close();
session = factory.createSession(false, false);
session.start();
ClientConsumer cons = session.createConsumer("A");
ClientMessage msg = cons.receive(CONSUMER_TIMEOUT);
assertNotNull(msg);
assertEquals(2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize());
for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) {
assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
}
msg = cons.receive(CONSUMER_TIMEOUT);
assertNotNull(msg);
assertEquals(2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize());
for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) {
assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
}
msg.acknowledge();
session.commit();
//make sure there is not tmp file left
File workingDir = new File(System.getProperty("user.dir"));
String[] flist = workingDir.list();
for (String fn : flist) {
assertFalse("leftover: " + fn, fn.endsWith(".tmp"));
}
}
@Test @Test
public void testLargeJmsTextMessage() throws Exception { public void testLargeJmsTextMessage() throws Exception {
basicSetUp(); basicSetUp();