ARTEMIS-5173 avoiding test failure
This commit is contained in:
parent
d49476d0a1
commit
e3c49e63d7
|
@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
|
@ -38,6 +39,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ScaleDownHandler;
|
import org.apache.activemq.artemis.core.server.impl.ScaleDownHandler;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
|
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
|
||||||
|
@ -160,9 +162,8 @@ public class ScaleDownDirectTest extends ClusterTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@TestTemplate
|
@TestTemplate
|
||||||
public void testPaging() throws Exception {
|
public void testPaging() throws Throwable {
|
||||||
final int CHUNK_SIZE = 50;
|
final int messageCount = 50;
|
||||||
int messageCount = 0;
|
|
||||||
final String addressName = "testAddress";
|
final String addressName = "testAddress";
|
||||||
final String queueName = "testQueue";
|
final String queueName = "testQueue";
|
||||||
|
|
||||||
|
@ -173,34 +174,44 @@ public class ScaleDownDirectTest extends ClusterTestBase {
|
||||||
ClientSession session = addClientSession(sf.createSession(false, false));
|
ClientSession session = addClientSession(sf.createSession(false, false));
|
||||||
ClientProducer producer = addClientProducer(session.createProducer(addressName));
|
ClientProducer producer = addClientProducer(session.createProducer(addressName));
|
||||||
|
|
||||||
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024);
|
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(1024);
|
||||||
servers[0].getAddressSettingsRepository().addMatch("#", defaultSetting);
|
servers[0].getAddressSettingsRepository().addMatch("#", defaultSetting);
|
||||||
|
Queue queue0 = servers[0].locateQueue(queueName);
|
||||||
|
queue0.getPagingStore().startPaging();
|
||||||
|
assertTrue(queue0.getPagingStore().isPaging());
|
||||||
|
|
||||||
while (!servers[0].getPagingManager().getPageStore(SimpleString.of(addressName)).isPaging()) {
|
|
||||||
for (int i = 0; i < CHUNK_SIZE; i++) {
|
|
||||||
Message message = session.createMessage(true);
|
|
||||||
message.getBodyBuffer().writeBytes(new byte[1024]);
|
|
||||||
// The only purpose of this count here is for eventually debug messages on print-data / print-pages
|
|
||||||
// message.putIntProperty("count", messageCount);
|
|
||||||
producer.send(message);
|
|
||||||
messageCount++;
|
|
||||||
}
|
|
||||||
session.commit();
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals(messageCount, performScaledown());
|
|
||||||
|
|
||||||
servers[0].stop();
|
|
||||||
|
|
||||||
addConsumer(0, 1, queueName, null);
|
|
||||||
for (int i = 0; i < messageCount; i++) {
|
for (int i = 0; i < messageCount; i++) {
|
||||||
ClientMessage message = consumers[0].getConsumer().receive(500);
|
Message message = session.createMessage(true);
|
||||||
assertNotNull(message);
|
message.getBodyBuffer().writeBytes(new byte[1024]);
|
||||||
// Assert.assertEquals(i, message.getIntProperty("count").intValue());
|
// The only purpose of this count here is for eventually debug messages on print-data / print-pages
|
||||||
|
// message.putIntProperty("count", messageCount);
|
||||||
|
producer.send(message);
|
||||||
}
|
}
|
||||||
|
session.commit();
|
||||||
|
|
||||||
assertNull(consumers[0].getConsumer().receiveImmediate());
|
assertTrue(queue0.getPagingStore().isPaging());
|
||||||
removeConsumer(0);
|
|
||||||
|
try {
|
||||||
|
Wait.assertEquals(messageCount, queue0::getMessageCount);
|
||||||
|
queue0.flushExecutor();
|
||||||
|
queue0.getPagingStore().getExecutor().flush(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
long scaledDown = performScaledown();
|
||||||
|
|
||||||
|
servers[0].stop();
|
||||||
|
|
||||||
|
addConsumer(0, 1, queueName, null);
|
||||||
|
for (int i = 0; i < scaledDown; i++) {
|
||||||
|
ClientMessage message = consumers[0].getConsumer().receive(500);
|
||||||
|
assertNotNull(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNull(consumers[0].getConsumer().receiveImmediate());
|
||||||
|
removeConsumer(0);
|
||||||
|
} catch (Throwable ig) {
|
||||||
|
ig.printStackTrace();
|
||||||
|
throw ig;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@TestTemplate
|
@TestTemplate
|
||||||
|
|
Loading…
Reference in New Issue