More tests and cleanup
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-08-01 12:27:34 -04:00
parent e73ab34837
commit eb9c584fbd
3 changed files with 137 additions and 7 deletions

View File

@ -19,7 +19,8 @@ package org.apache.activemq.command;
import org.apache.activemq.state.CommandVisitor;
/**
* Used to represent a durable subscription.
* Used to represent the durable subscriptions contained by the broker
* This is used to synchronize durable subs on bridge creation
*
* @openwire:marshaller code="92"
*

View File

@ -21,6 +21,7 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer;
import javax.jms.Session;
@ -32,12 +33,16 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@ -61,6 +66,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
private Session session1;
private final FLOW flow;
@Rule
public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS);
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
@ -209,6 +217,43 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
}
@Test
public void testSyncLoadTest() throws Exception {
String subName = this.subName;
//Create 1000 subs
for (int i = 0; i < 100; i++) {
for (int j = 0; j < 10; j++) {
session1.createDurableSubscriber(new ActiveMQTopic("include.test." + i), subName + i + j).close();
}
}
for (int i = 0; i < 100; i++) {
assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 1);
}
doTearDown();
restartBroker(broker1, false);
//with bridge off, remove 100 subs
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
removeSubscription(broker1, new ActiveMQTopic("include.test." + i), subName + i + j);
}
}
//restart test that 900 are resynced and 100 are deleted
restartBrokers(true);
for (int i = 0; i < 10; i++) {
assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 0);
}
for (int i = 10; i < 100; i++) {
assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 1);
}
}
/**
* Using an older version of openwire should not sync but the network bridge
* should still start without error
@ -395,10 +440,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
localConnection.setClientID("clientId");
localConnection.start();
if (startNetworkConnector) { // brokerService.setPlugins(new BrokerPlugin[] {new
// JavaRuntimeConfigurationPlugin()});
// brokerService.setUseVirtualDestSubs(true);
// brokerService.setUseVirtualDestSubsOnCreation(isUsevirtualDestinationSubscriptionsOnCreation);
if (startNetworkConnector) {
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
@ -439,8 +481,12 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setMonitorConnectionSplits(true);
brokerService.setDataDirectoryFile(dataDir);
brokerService.setBrokerName("localBroker");
brokerService.setDataDirectoryFile(dataDir);
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setDirectory(dataDir);
adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
brokerService.setPersistenceAdapter(adapter);
if (startNetworkConnector) {
brokerService.addNetworkConnector(configureLocalNetworkConnector());
@ -477,10 +523,15 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
brokerService.setBrokerName("remoteBroker");
brokerService.setUseJmx(false);
brokerService.setDataDirectoryFile(dataDir);
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setDirectory(dataDir);
adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
brokerService.setPersistenceAdapter(adapter);
remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.version=" + remoteBrokerWireFormatVersion);
//Need a larger cache size in order to handle all of the durables
brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.cacheSize=2048&wireFormat.version=" + remoteBrokerWireFormatVersion);
return brokerService;
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.command.BrokerSubscriptionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BrokerSubscriptionInfoTest {
static final Logger LOG = LoggerFactory.getLogger(BrokerSubscriptionInfoTest.class);
@Test
public void testMarshalClientProperties() throws IOException {
// marshal object
OpenWireFormatFactory factory = new OpenWireFormatFactory();
factory.setCacheEnabled(true);
OpenWireFormat wf = (OpenWireFormat)factory.createWireFormat();
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
DataOutputStream ds = new DataOutputStream(buffer);
ConsumerInfo info = new ConsumerInfo();
info.setClientId("clientId");
info.setConsumerId(new ConsumerId());
int size = 1000;
ConsumerInfo infos[] = new ConsumerInfo[size];
for (int i = 0; i < infos.length; i++) {
infos[i] = info;
}
BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo();
bsi.setSubscriptionInfos(infos);
wf.marshal(bsi, ds);
ds.close();
// unmarshal object and check that the properties are present.
ByteArrayInputStream in = new ByteArrayInputStream(buffer.toByteArray());
DataInputStream dis = new DataInputStream(in);
BrokerSubscriptionInfo actual = (BrokerSubscriptionInfo) wf.unmarshal(dis);
//assertTrue(actual instanceof BrokerSubscriptionInfo);
assertEquals(size, actual.getSubscriptionInfos().length);
}
}