Improving up time of some tests
This commit is contained in:
parent
1492fe7adc
commit
077a416ee0
|
@ -168,7 +168,7 @@ public class PagingStoreImpl implements PagingStore
|
||||||
|
|
||||||
this.syncNonTransactional = syncNonTransactional;
|
this.syncNonTransactional = syncNonTransactional;
|
||||||
|
|
||||||
if (scheduledExecutor != null)
|
if (scheduledExecutor != null && syncTimeout > 0)
|
||||||
{
|
{
|
||||||
this.syncTimer = new PageSyncTimer(this, scheduledExecutor, syncTimeout);
|
this.syncTimer = new PageSyncTimer(this, scheduledExecutor, syncTimeout);
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,6 +89,7 @@ public class PagingLeakTest extends ActiveMQTestBase
|
||||||
)
|
)
|
||||||
public void testValidateLeak() throws Throwable
|
public void testValidateLeak() throws Throwable
|
||||||
{
|
{
|
||||||
|
System.out.println("location::" + getBindingsDir());
|
||||||
|
|
||||||
List<PagePositionImpl> positions = new ArrayList<PagePositionImpl>();
|
List<PagePositionImpl> positions = new ArrayList<PagePositionImpl>();
|
||||||
|
|
||||||
|
@ -121,13 +122,15 @@ public class PagingLeakTest extends ActiveMQTestBase
|
||||||
// A backup that will be waiting to be activated
|
// A backup that will be waiting to be activated
|
||||||
Configuration config = createDefaultNettyConfig();
|
Configuration config = createDefaultNettyConfig();
|
||||||
|
|
||||||
|
config.setJournalBufferTimeout_AIO(0).setJournalBufferTimeout_NIO(0);
|
||||||
|
|
||||||
final ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, true));
|
final ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, true));
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
AddressSettings settings = new AddressSettings()
|
AddressSettings settings = new AddressSettings()
|
||||||
.setPageSizeBytes(20 * 1024)
|
.setPageSizeBytes(2 * 1024)
|
||||||
.setMaxSizeBytes(200 * 1024)
|
.setMaxSizeBytes(20 * 1024)
|
||||||
.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
|
.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
|
||||||
|
|
||||||
server.getAddressSettingsRepository().addMatch("#", settings);
|
server.getAddressSettingsRepository().addMatch("#", settings);
|
||||||
|
@ -199,9 +202,9 @@ public class PagingLeakTest extends ActiveMQTestBase
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int numberOfMessages = 10000;
|
int numberOfMessages = 500;
|
||||||
|
|
||||||
Consumer consumer1 = new Consumer(100, "-1", 150);
|
Consumer consumer1 = new Consumer(10, "-1", 150);
|
||||||
Consumer consumer2 = new Consumer(0, "-2", numberOfMessages);
|
Consumer consumer2 = new Consumer(0, "-2", numberOfMessages);
|
||||||
|
|
||||||
final ServerLocator locator = createInVMLocator(0);
|
final ServerLocator locator = createInVMLocator(0);
|
||||||
|
@ -219,20 +222,21 @@ public class PagingLeakTest extends ActiveMQTestBase
|
||||||
msg.getBodyBuffer().writeBytes(b);
|
msg.getBodyBuffer().writeBytes(b);
|
||||||
producer.send(msg);
|
producer.send(msg);
|
||||||
|
|
||||||
if (i == 1000)
|
if (i == 100)
|
||||||
{
|
{
|
||||||
System.out.println("Starting consumers!!!");
|
System.out.println("Starting consumers!!!");
|
||||||
consumer1.start();
|
consumer1.start();
|
||||||
consumer2.start();
|
consumer2.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (i % 1000 == 0)
|
if (i % 250 == 0)
|
||||||
{
|
{
|
||||||
validateInstances();
|
validateInstances();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
System.out.println("Sent " + numberOfMessages);
|
||||||
|
|
||||||
consumer1.join();
|
consumer1.join();
|
||||||
consumer2.join();
|
consumer2.join();
|
||||||
|
|
|
@ -110,7 +110,7 @@ public class LargeMessageTest extends LargeMessageTestBase
|
||||||
AddressSettings settings = new AddressSettings();
|
AddressSettings settings = new AddressSettings();
|
||||||
if (redeliveryDelay)
|
if (redeliveryDelay)
|
||||||
{
|
{
|
||||||
settings.setRedeliveryDelay(1000);
|
settings.setRedeliveryDelay(100);
|
||||||
if (locator.isCompressLargeMessage())
|
if (locator.isCompressLargeMessage())
|
||||||
{
|
{
|
||||||
locator.setConsumerWindowSize(0);
|
locator.setConsumerWindowSize(0);
|
||||||
|
@ -2781,9 +2781,9 @@ public class LargeMessageTest extends LargeMessageTestBase
|
||||||
|
|
||||||
final int numberOfBytesBigMessage = 400000;
|
final int numberOfBytesBigMessage = 400000;
|
||||||
|
|
||||||
locator.setBlockOnNonDurableSend(true)
|
locator.setBlockOnNonDurableSend(false)
|
||||||
.setBlockOnDurableSend(true)
|
.setBlockOnDurableSend(false)
|
||||||
.setBlockOnAcknowledge(true)
|
.setBlockOnAcknowledge(false)
|
||||||
.setCompressLargeMessage(true);
|
.setCompressLargeMessage(true);
|
||||||
|
|
||||||
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
|
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
|
||||||
|
@ -2864,10 +2864,6 @@ public class LargeMessageTest extends LargeMessageTestBase
|
||||||
messageLarge.saveToOutputStream(bout);
|
messageLarge.saveToOutputStream(bout);
|
||||||
byte[] body = bout.toByteArray();
|
byte[] body = bout.toByteArray();
|
||||||
assertEquals(numberOfBytesBigMessage, body.length);
|
assertEquals(numberOfBytesBigMessage, body.length);
|
||||||
for (int bi = 0; bi < body.length; bi++)
|
|
||||||
{
|
|
||||||
assertEquals(getSamplebyte(bi), body[bi]);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
session.rollback();
|
session.rollback();
|
||||||
|
|
|
@ -239,7 +239,7 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
|
|
||||||
if (largeMessage)
|
if (largeMessage)
|
||||||
{
|
{
|
||||||
message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(1024 * 1024));
|
message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10 * 1024));
|
||||||
}
|
}
|
||||||
|
|
||||||
message.putIntProperty(propKey, i);
|
message.putIntProperty(propKey, i);
|
||||||
|
@ -259,7 +259,7 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
|
|
||||||
if (largeMessage)
|
if (largeMessage)
|
||||||
{
|
{
|
||||||
readMessages(message);
|
readLargeMessages(message, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
message.acknowledge();
|
message.acknowledge();
|
||||||
|
@ -362,13 +362,13 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
.setName("bridge1")
|
.setName("bridge1")
|
||||||
.setQueueName(queueName0)
|
.setQueueName(queueName0)
|
||||||
.setForwardingAddress(forwardAddress)
|
.setForwardingAddress(forwardAddress)
|
||||||
.setRetryInterval(1000)
|
.setRetryInterval(100)
|
||||||
.setReconnectAttempts(-1)
|
.setReconnectAttempts(-1)
|
||||||
.setReconnectAttemptsOnSameNode(-1)
|
.setReconnectAttemptsOnSameNode(-1)
|
||||||
.setUseDuplicateDetection(false)
|
.setUseDuplicateDetection(false)
|
||||||
.setConfirmationWindowSize(numMessages * messageSize / 2)
|
.setConfirmationWindowSize(numMessages * messageSize / 2)
|
||||||
.setStaticConnectors(connectorConfig)
|
.setStaticConnectors(connectorConfig)
|
||||||
.setCallTimeout(5000);
|
.setCallTimeout(500);
|
||||||
|
|
||||||
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
|
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
|
||||||
bridgeConfigs.add(bridgeConfiguration);
|
bridgeConfigs.add(bridgeConfiguration);
|
||||||
|
@ -418,7 +418,7 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
|
|
||||||
if (largeMessage)
|
if (largeMessage)
|
||||||
{
|
{
|
||||||
message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(1024 * 1024));
|
message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10 * 1024));
|
||||||
}
|
}
|
||||||
|
|
||||||
message.putIntProperty(propKey, i);
|
message.putIntProperty(propKey, i);
|
||||||
|
@ -443,7 +443,7 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
|
|
||||||
if (largeMessage)
|
if (largeMessage)
|
||||||
{
|
{
|
||||||
readMessages(message);
|
readLargeMessages(message, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
message.acknowledge();
|
message.acknowledge();
|
||||||
|
@ -480,11 +480,11 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
/**
|
/**
|
||||||
* @param message
|
* @param message
|
||||||
*/
|
*/
|
||||||
private void readMessages(final ClientMessage message)
|
private void readLargeMessages(final ClientMessage message, int kiloBlocks)
|
||||||
{
|
{
|
||||||
byte[] byteRead = new byte[1024];
|
byte[] byteRead = new byte[1024];
|
||||||
|
|
||||||
for (int j = 0; j < 1024; j++)
|
for (int j = 0; j < kiloBlocks; j++)
|
||||||
{
|
{
|
||||||
message.getBodyBuffer().readBytes(byteRead);
|
message.getBodyBuffer().readBytes(byteRead);
|
||||||
}
|
}
|
||||||
|
@ -603,7 +603,7 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
|
|
||||||
if (largeMessage)
|
if (largeMessage)
|
||||||
{
|
{
|
||||||
message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(1024 * 1024));
|
message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10 * 1024));
|
||||||
}
|
}
|
||||||
|
|
||||||
producer0.send(message);
|
producer0.send(message);
|
||||||
|
@ -621,7 +621,7 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
|
|
||||||
if (largeMessage)
|
if (largeMessage)
|
||||||
{
|
{
|
||||||
message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(1024 * 1024));
|
message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10 * 1024));
|
||||||
}
|
}
|
||||||
|
|
||||||
producer0.send(message);
|
producer0.send(message);
|
||||||
|
@ -641,7 +641,7 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
|
|
||||||
if (largeMessage)
|
if (largeMessage)
|
||||||
{
|
{
|
||||||
readMessages(message);
|
readLargeMessages(message, 10);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1333,9 +1333,9 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
ActiveMQServer server0 = null;
|
ActiveMQServer server0 = null;
|
||||||
ActiveMQServer server1 = null;
|
ActiveMQServer server1 = null;
|
||||||
|
|
||||||
final int PAGE_MAX = 100 * 1024;
|
final int PAGE_MAX = 10 * 1024;
|
||||||
|
|
||||||
final int PAGE_SIZE = 10 * 1024;
|
final int PAGE_SIZE = 1 * 1024;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
|
||||||
|
@ -1345,6 +1345,7 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
Map<String, Object> server1Params = new HashMap<String, Object>();
|
Map<String, Object> server1Params = new HashMap<String, Object>();
|
||||||
addTargetParameters(server1Params);
|
addTargetParameters(server1Params);
|
||||||
server1 = createClusteredServerWithParams(isNetty(), 1, true, PAGE_SIZE, PAGE_MAX, server1Params);
|
server1 = createClusteredServerWithParams(isNetty(), 1, true, PAGE_SIZE, PAGE_MAX, server1Params);
|
||||||
|
server1.getConfiguration().setJournalBufferTimeout_AIO(0).setJournalBufferTimeout_NIO(0);
|
||||||
|
|
||||||
final String testAddress = "testAddress";
|
final String testAddress = "testAddress";
|
||||||
final String queueName0 = "queue0";
|
final String queueName0 = "queue0";
|
||||||
|
@ -1359,7 +1360,7 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
|
|
||||||
server0.getConfiguration().setConnectorConfigurations(connectors);
|
server0.getConfiguration().setConnectorConfigurations(connectors);
|
||||||
|
|
||||||
server0.getConfiguration().setIDCacheSize(20000);
|
server0.getConfiguration().setIDCacheSize(20000).setJournalBufferTimeout_NIO(0).setJournalBufferTimeout_AIO(0);
|
||||||
|
|
||||||
ArrayList<String> staticConnectors = new ArrayList<String>();
|
ArrayList<String> staticConnectors = new ArrayList<String>();
|
||||||
staticConnectors.add(server1tc.getName());
|
staticConnectors.add(server1tc.getName());
|
||||||
|
@ -1420,7 +1421,7 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
|
|
||||||
session1.start();
|
session1.start();
|
||||||
|
|
||||||
final int numMessages = 6000;
|
final int numMessages = 200;
|
||||||
|
|
||||||
final SimpleString propKey = new SimpleString("testkey");
|
final SimpleString propKey = new SimpleString("testkey");
|
||||||
|
|
||||||
|
@ -1570,11 +1571,7 @@ public class BridgeTest extends ActiveMQTestBase
|
||||||
public synchronized boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
|
public synchronized boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
|
||||||
{
|
{
|
||||||
|
|
||||||
if (packet instanceof SessionSendMessage && count == 1000)
|
if (packet instanceof SessionSendMessage && ++count == 100)
|
||||||
{
|
|
||||||
System.out.println("Going to kill the server");
|
|
||||||
}
|
|
||||||
if (packet instanceof SessionSendMessage && ++count == 5000)
|
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue