This commit is contained in:
Clebert Suconic 2019-06-21 15:09:09 -04:00
commit b3e740454c
3 changed files with 77 additions and 1 deletions

View File

@ -56,7 +56,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
// Constants ----------------------------------------------------- // Constants -----------------------------------------------------
private static final String ADDRESS_FILE = "address.txt"; public static final String ADDRESS_FILE = "address.txt";
// Attributes ---------------------------------------------------- // Attributes ----------------------------------------------------
@ -217,6 +217,12 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
addressString = reader.readLine(); addressString = reader.readLine();
} }
// there's no address listed in the file so we just skip it
if (addressString == null) {
ActiveMQServerLogger.LOGGER.emptyAddressFile(PagingStoreFactoryNIO.ADDRESS_FILE, file.toString());
continue;
}
SimpleString address = new SimpleString(addressString); SimpleString address = new SimpleString(addressString);
SequentialFileFactory factory = newFileFactory(guid); SequentialFileFactory factory = newFileFactory(guid);

View File

@ -1629,6 +1629,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 222281, value = "Federation upstream {0} policy ref {1} are too self referential, avoiding stack overflow , ", format = Message.Format.MESSAGE_FORMAT) @Message(id = 222281, value = "Federation upstream {0} policy ref {1} are too self referential, avoiding stack overflow , ", format = Message.Format.MESSAGE_FORMAT)
void federationAvoidStackOverflowPolicyRef(String upstreamName, String policyRef); void federationAvoidStackOverflowPolicyRef(String upstreamName, String policyRef);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222282, value = "File {0} at {1} is empty. Delete the empty file to stop this message.",
format = Message.Format.MESSAGE_FORMAT)
void emptyAddressFile(String addressFile, String directory);
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e); void initializationError(@Cause Throwable e);

View File

@ -25,6 +25,7 @@ import javax.transaction.xa.Xid;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintWriter;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
@ -491,6 +492,70 @@ public class PagingTest extends ActiveMQTestBase {
System.out.println("pgComplete = " + pgComplete); System.out.println("pgComplete = " + pgComplete);
} }
@Test
public void testEmptyAddress() throws Exception {
if (storeType == StoreConfiguration.StoreType.FILE) {
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
final int numberOfMessages = 5000;
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
session.createQueue(PagingTest.ADDRESS, RoutingType.ANYCAST, PagingTest.ADDRESS, null, true);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= MESSAGE_SIZE; j++) {
bb.put(getSamplebyte(j));
}
for (int i = 0; i < numberOfMessages; i++) {
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeBytes(body);
producer.send(message);
if (i % 1000 == 0) {
session.commit();
}
}
session.commit();
producer.close();
session.close();
String addressTxt = server.getPagingManager().getPageStore(PagingTest.ADDRESS).getFolder().getAbsolutePath() + File.separator + PagingStoreFactoryNIO.ADDRESS_FILE;
server.stop();
// delete contents of address.txt
new PrintWriter(addressTxt).close();
final AtomicBoolean activationFailures = new AtomicBoolean();
server.registerActivationFailureListener(exception -> activationFailures.set(true));
server.start();
server.stop();
assertFalse(activationFailures.get());
}
}
@Test @Test
public void testPurge() throws Exception { public void testPurge() throws Exception {
clearDataRecreateServerDirs(); clearDataRecreateServerDirs();