ARTEMIS-4241 paging + FQQN is broken
This commit is contained in:
parent
05f5af6eb1
commit
673481369f
|
@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
|
|||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.apache.activemq.artemis.utils.SizeAwareMetric;
|
||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||
import org.apache.activemq.artemis.utils.collections.LongHashSet;
|
||||
|
@ -393,7 +394,7 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
public void deletePageStore(final SimpleString storeName) throws Exception {
|
||||
syncLock.readLock().lock();
|
||||
try {
|
||||
PagingStore store = stores.remove(storeName);
|
||||
PagingStore store = stores.remove(CompositeAddress.extractAddressName(storeName));
|
||||
if (store != null) {
|
||||
store.stop();
|
||||
store.destroy();
|
||||
|
@ -407,7 +408,8 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
* This method creates a new store if not exist.
|
||||
*/
|
||||
@Override
|
||||
public PagingStore getPageStore(final SimpleString storeName) throws Exception {
|
||||
public PagingStore getPageStore(final SimpleString rawStoreName) throws Exception {
|
||||
final SimpleString storeName = CompositeAddress.extractAddressName(rawStoreName);
|
||||
if (managementAddress != null && storeName.startsWith(managementAddress)) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -115,6 +115,7 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
|||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.junit.After;
|
||||
|
@ -1393,6 +1394,79 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFqqn() throws Exception {
|
||||
final SimpleString queue = RandomUtil.randomSimpleString();
|
||||
SimpleString fqqn = CompositeAddress.toFullyQualified(ADDRESS, queue);
|
||||
boolean persistentMessages = true;
|
||||
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||
|
||||
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
|
||||
|
||||
server.start();
|
||||
|
||||
final int numberOfMessages = 1000;
|
||||
|
||||
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
|
||||
|
||||
sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = sf.createSession(false, false, false);
|
||||
|
||||
session.createQueue(new QueueConfiguration(fqqn).setRoutingType(RoutingType.ANYCAST));
|
||||
|
||||
ClientProducer producer = session.createProducer(fqqn);
|
||||
|
||||
ClientMessage message = null;
|
||||
|
||||
byte[] body = new byte[MESSAGE_SIZE];
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
message = session.createMessage(persistentMessages);
|
||||
|
||||
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||
|
||||
bodyLocal.writeBytes(body);
|
||||
|
||||
message.putIntProperty(new SimpleString("id"), i);
|
||||
|
||||
producer.send(message);
|
||||
if (i % 1000 == 0) {
|
||||
session.commit();
|
||||
}
|
||||
}
|
||||
|
||||
session.commit();
|
||||
|
||||
Wait.assertTrue(server.getPagingManager().getPageStore(ADDRESS)::isPaging, 5000, 100);
|
||||
assertEquals(ADDRESS, server.getPagingManager().getPageStore(ADDRESS).getAddress());
|
||||
|
||||
session.start();
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(fqqn);
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
message = consumer.receive(5000);
|
||||
assertNotNull(message);
|
||||
message.acknowledge();
|
||||
|
||||
assertEquals(i, message.getIntProperty("id").intValue());
|
||||
if (i % 1000 == 0) {
|
||||
session.commit();
|
||||
}
|
||||
}
|
||||
|
||||
session.commit();
|
||||
|
||||
Wait.assertFalse(server.getPagingManager().getPageStore(ADDRESS)::isPaging, 5000, 100);
|
||||
|
||||
server.getPagingManager().deletePageStore(fqqn);
|
||||
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(ADDRESS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPurge() throws Exception {
|
||||
clearDataRecreateServerDirs();
|
||||
|
|
Loading…
Reference in New Issue