Changes for https://issues.apache.org/jira/browse/AMQ-4165 : Remove pure master/slave functionality

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1413846 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-11-26 21:13:25 +00:00
parent de30d11f94
commit 30f531d741
16 changed files with 169 additions and 587 deletions

View File

@ -168,7 +168,6 @@ public class BrokerService implements Service {
private File schedulerDirectoryFile;
private Scheduler scheduler;
private ThreadPoolExecutor executor;
private boolean slave = true;
private int schedulePeriodForDestinationPurge= 0;
private int maxPurgedDestinationsPerSweep = 0;
private BrokerContext brokerContext;
@ -392,13 +391,6 @@ public class BrokerService implements Service {
return null;
}
/**
* @return true if this Broker is a slave to a Master
*/
public boolean isSlave() {
return slave;
}
public void masterFailed() {
if (shutdownOnMasterFailure) {
LOG.error("The Master has failed ... shutting down");
@ -578,7 +570,6 @@ public class BrokerService implements Service {
if (startException != null) {
return;
}
slave = false;
startDestinations();
addShutdownHook();
@ -604,9 +595,7 @@ public class BrokerService implements Service {
adminView.setBroker(managedBroker);
}
if (!isSlave()) {
startAllConnectors();
}
startAllConnectors();
if (ioExceptionHandler == null) {
setIoExceptionHandler(new DefaultIOExceptionHandler());
@ -680,7 +669,6 @@ public class BrokerService implements Service {
try {
stopper.stop(persistenceAdapter);
persistenceAdapter = null;
slave = true;
if (isUseJmx()) {
stopper.stop(getManagementContext());
managementContext = null;
@ -1227,8 +1215,7 @@ public class BrokerService implements Service {
}
/**
* Sets the services associated with this broker such as a
* {@link MasterConnector}
* Sets the services associated with this broker.
*/
public void setServices(Service[] services) {
this.services.clear();
@ -2246,82 +2233,80 @@ public class BrokerService implements Service {
* @throws Exception
*/
public void startAllConnectors() throws Exception {
if (!isSlave()) {
Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
List<TransportConnector> al = new ArrayList<TransportConnector>();
for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = iter.next();
connector.setBrokerService(this);
al.add(startTransportConnector(connector));
}
if (al.size() > 0) {
// let's clear the transportConnectors list and replace it with
// the started transportConnector instances
this.transportConnectors.clear();
setTransportConnectors(al);
}
URI uri = getVmConnectorURI();
Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
List<TransportConnector> al = new ArrayList<TransportConnector>();
for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = iter.next();
connector.setBrokerService(this);
al.add(startTransportConnector(connector));
}
if (al.size() > 0) {
// let's clear the transportConnectors list and replace it with
// the started transportConnector instances
this.transportConnectors.clear();
setTransportConnectors(al);
}
URI uri = getVmConnectorURI();
Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
if (!stopped.get()) {
ThreadPoolExecutor networkConnectorStartExecutor = null;
if (isNetworkConnectorStartAsync()) {
// spin up as many threads as needed
networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactory() {
int count=0;
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
thread.setDaemon(true);
return thread;
}
});
}
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
final NetworkConnector connector = iter.next();
connector.setLocalUri(uri);
connector.setBrokerName(getBrokerName());
connector.setDurableDestinations(durableDestinations);
if (getDefaultSocketURIString() != null) {
connector.setBrokerURL(getDefaultSocketURIString());
}
if (networkConnectorStartExecutor != null) {
networkConnectorStartExecutor.execute(new Runnable() {
public void run() {
try {
LOG.info("Async start of " + connector);
connector.start();
} catch(Exception e) {
LOG.error("Async start of network connector: " + connector + " failed", e);
}
if (!stopped.get()) {
ThreadPoolExecutor networkConnectorStartExecutor = null;
if (isNetworkConnectorStartAsync()) {
// spin up as many threads as needed
networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactory() {
int count=0;
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
thread.setDaemon(true);
return thread;
}
});
} else {
connector.start();
}
}
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
final NetworkConnector connector = iter.next();
connector.setLocalUri(uri);
connector.setBrokerName(getBrokerName());
connector.setDurableDestinations(durableDestinations);
if (getDefaultSocketURIString() != null) {
connector.setBrokerURL(getDefaultSocketURIString());
}
if (networkConnectorStartExecutor != null) {
// executor done when enqueued tasks are complete
ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
networkConnectorStartExecutor.execute(new Runnable() {
public void run() {
try {
LOG.info("Async start of " + connector);
connector.start();
} catch(Exception e) {
LOG.error("Async start of network connector: " + connector + " failed", e);
}
}
});
} else {
connector.start();
}
}
if (networkConnectorStartExecutor != null) {
// executor done when enqueued tasks are complete
ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
}
for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
ProxyConnector connector = iter.next();
connector.start();
}
for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
JmsConnector connector = iter.next();
connector.start();
}
for (Service service : services) {
configureService(service);
service.start();
}
for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
ProxyConnector connector = iter.next();
connector.start();
}
for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
JmsConnector connector = iter.next();
connector.start();
}
for (Service service : services) {
configureService(service);
service.start();
}
}
}

View File

@ -295,13 +295,6 @@ public class ConnectionContext {
return dontSendReponse;
}
/**
* @return the slave
*/
public boolean isSlave() {
return (this.broker != null && this.broker.getBrokerService().isSlave()) || !this.clientMaster;
}
/**
* @return the clientMaster
*/

View File

@ -186,10 +186,6 @@ public class BrokerView implements BrokerViewMBean {
return brokerService.isPersistent();
}
public boolean isSlave() {
return brokerService.isSlave();
}
public void terminateJVM(int exitCode) {
System.exit(exitCode);
}

View File

@ -115,9 +115,6 @@ public interface BrokerViewMBean extends Service {
@MBeanInfo("Messages are synchronized to disk.")
boolean isPersistent();
@MBeanInfo("Slave broker.")
boolean isSlave();
/**
* Shuts down the JVM.
*

View File

@ -116,10 +116,6 @@ public abstract class AbstractSubscription implements Subscription {
public void gc() {
}
public boolean isSlave() {
return broker.getBrokerService().isSlave();
}
public ConnectionContext getContext() {
return context;
}

View File

@ -92,7 +92,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// The slave should not deliver pull messages.
// TODO: when the slave becomes a master, He should send a NULL message to all the
// consumers to 'wake them up' in case they were waiting for a message.
if (getPrefetchSize() == 0 && !isSlave()) {
if (getPrefetchSize() == 0) {
prefetchExtension.incrementAndGet();
final long dispatchCounterBeforePull = dispatchCounter;
@ -194,13 +194,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
boolean callDispatchMatched = false;
Destination destination = null;
if (!isSlave()) {
if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
// suppress unexpected ack exception in this expected case
LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
return;
}
if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
// suppress unexpected ack exception in this expected case
LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
return;
}
if (LOG.isTraceEnabled()) {
LOG.trace("ack:" + ack);
}
@ -413,15 +412,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
destination.wakeup();
dispatchPending();
} else {
if (isSlave()) {
throw new JMSException(
"Slave broker out of sync with master: Acknowledgment ("
+ ack + ") was not in the dispatch list: "
+ dispatched);
} else {
LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
+ ack);
}
LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
+ ack);
}
}
@ -447,11 +439,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
@Override
public void afterRollback() throws Exception {
synchronized(dispatchLock) {
if (isSlave()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
} else {
// poisionAck will decrement - otherwise still inflight on client
}
// poisionAck will decrement - otherwise still inflight on client
}
}
});
@ -617,53 +605,51 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
protected void dispatchPending() throws IOException {
if (!isSlave()) {
synchronized(pendingLock) {
try {
int numberToDispatch = countBeforeFull();
if (numberToDispatch > 0) {
setSlowConsumer(false);
setPendingBatchSize(pending, numberToDispatch);
int count = 0;
pending.reset();
while (pending.hasNext() && !isFull()
&& count < numberToDispatch) {
MessageReference node = pending.next();
if (node == null) {
break;
}
// Synchronize between dispatched list and remove of message from pending list
// related to remove subscription action
synchronized(dispatchLock) {
pending.remove();
node.decrementReferenceCount();
if( !isDropped(node) && canDispatch(node)) {
// Message may have been sitting in the pending
// list a while waiting for the consumer to ak the message.
if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
//increment number to dispatch
numberToDispatch++;
if (broker.isExpired(node)) {
((Destination)node.getRegionDestination()).messageExpired(context, this, node);
}
continue;
}
dispatch(node);
count++;
}
}
synchronized(pendingLock) {
try {
int numberToDispatch = countBeforeFull();
if (numberToDispatch > 0) {
setSlowConsumer(false);
setPendingBatchSize(pending, numberToDispatch);
int count = 0;
pending.reset();
while (pending.hasNext() && !isFull()
&& count < numberToDispatch) {
MessageReference node = pending.next();
if (node == null) {
break;
}
} else if (!isSlowConsumer()) {
setSlowConsumer(true);
for (Destination dest :destinations) {
dest.slowConsumer(context, this);
// Synchronize between dispatched list and remove of message from pending list
// related to remove subscription action
synchronized(dispatchLock) {
pending.remove();
node.decrementReferenceCount();
if( !isDropped(node) && canDispatch(node)) {
// Message may have been sitting in the pending
// list a while waiting for the consumer to ak the message.
if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
//increment number to dispatch
numberToDispatch++;
if (broker.isExpired(node)) {
((Destination)node.getRegionDestination()).messageExpired(context, this, node);
}
continue;
}
dispatch(node);
count++;
}
}
}
} finally {
pending.release();
} else if (!isSlowConsumer()) {
setSlowConsumer(true);
for (Destination dest :destinations) {
dest.slowConsumer(context, this);
}
}
} finally {
pending.release();
}
}
}
@ -682,42 +668,37 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
okForAckAsDispatchDone.countDown();
// No reentrant lock - Patch needed to IndirectMessageReference on method lock
if (!isSlave()) {
MessageDispatch md = createMessageDispatch(node, message);
// NULL messages don't count... they don't get Acked.
if (node != QueueMessageReference.NULL_MESSAGE) {
dispatchCounter++;
dispatched.add(node);
} else {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
MessageDispatch md = createMessageDispatch(node, message);
// NULL messages don't count... they don't get Acked.
if (node != QueueMessageReference.NULL_MESSAGE) {
dispatchCounter++;
dispatched.add(node);
} else {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
if (info.isDispatchAsync()) {
md.setTransmitCallback(new Runnable() {
public void run() {
// Since the message gets queued up in async dispatch,
// we don't want to
// decrease the reference count until it gets put on the
// wire.
onDispatch(node, message);
}
});
context.getConnection().dispatchAsync(md);
} else {
context.getConnection().dispatchSync(md);
onDispatch(node, message);
}
return true;
} else {
return false;
}
if (info.isDispatchAsync()) {
md.setTransmitCallback(new Runnable() {
public void run() {
// Since the message gets queued up in async dispatch,
// we don't want to
// decrease the reference count until it gets put on the
// wire.
onDispatch(node, message);
}
});
context.getConnection().dispatchAsync(md);
} else {
context.getConnection().dispatchSync(md);
onDispatch(node, message);
}
return true;
}
protected void onDispatch(final MessageReference node, final Message message) {

View File

@ -471,13 +471,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
browserDispatches.add(browserDispatch);
}
if (!(this.optimizedDispatch || isSlave())) {
if (!this.optimizedDispatch) {
wakeup();
}
}finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
if (this.optimizedDispatch || isSlave()) {
if (this.optimizedDispatch) {
// Outside of dispatchLock() to maintain the lock hierarchy of
// iteratingMutex -> dispatchLock. - see
// https://issues.apache.org/activemq/browse/AMQ-1878
@ -578,13 +578,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}finally {
consumersLock.writeLock().unlock();
}
if (!(this.optimizedDispatch || isSlave())) {
if (!this.optimizedDispatch) {
wakeup();
}
}finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
if (this.optimizedDispatch || isSlave()) {
if (this.optimizedDispatch) {
// Outside of dispatchLock() to maintain the lock hierarchy of
// iteratingMutex -> dispatchLock. - see
// https://issues.apache.org/activemq/browse/AMQ-1878
@ -1704,7 +1704,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
public void wakeup() {
if ((optimizedDispatch || isSlave()) && !iterationRunning) {
if (optimizedDispatch && !iterationRunning) {
iterate();
pendingWakeups.incrementAndGet();
} else {
@ -1721,10 +1721,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
private boolean isSlave() {
return broker.getBrokerService().isSlave();
}
private void doPageIn(boolean force) throws Exception {
PendingList newlyPaged = doPageInForDispatch(force);
pagedInPendingDispatchLock.writeLock().lock();
@ -1875,7 +1871,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
consumersLock.writeLock().lock();
try {
if (this.consumers.isEmpty() || isSlave()) {
if (this.consumers.isEmpty()) {
// slave dispatch happens in processDispatchNotification
return list;
}

View File

@ -695,10 +695,6 @@ public class RegionBroker extends EmptyBroker {
}
}
public boolean isSlaveBroker() {
return brokerService.isSlave();
}
@Override
public boolean isStopped() {
return !started;

View File

@ -109,12 +109,7 @@ public interface Subscription extends SubscriptionRecovery {
* @throws Exception
*/
void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception;
/**
* @return true if the broker is currently in slave mode
*/
boolean isSlave();
/**
* @return number of messages pending delivery
*/

View File

@ -101,7 +101,7 @@ public class TopicSubscription extends AbstractSubscription {
return;
}
enqueueCounter.incrementAndGet();
if (!isFull() && matched.isEmpty() && !isSlave()) {
if (!isFull() && matched.isEmpty()) {
// if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
dispatch(node);
@ -299,7 +299,7 @@ public class TopicSubscription extends AbstractSubscription {
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
// The slave should not deliver pull messages.
if (getPrefetchSize() == 0 && !isSlave()) {
if (getPrefetchSize() == 0 ) {
prefetchWindowOpen.set(true);
dispatchMatched();

View File

@ -338,7 +338,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
broker.addTopic(getDestinationString() + "1 ");
broker.addTopic(" " + getDestinationString() + "2");
@ -536,7 +535,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
broker.addTopic(getDestinationString() + "1");
broker.addTopic(getDestinationString() + "2");
@ -588,7 +586,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
broker.addTopic(getDestinationString() + "1");
broker.addTopic(getDestinationString() + "2");
@ -797,7 +794,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertTrue("broker is not a slave", !broker.isSlave());
assertEquals(0, broker.getDynamicDestinationProducers().length);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -234,10 +234,6 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
return false;
}
public boolean isSlave() {
return false;
}
public boolean matches(MessageReference node,
MessageEvaluationContext context) throws IOException {
return true;

View File

@ -1,338 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.discovery;
import java.io.File;
import java.net.URI;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MasterSlaveDiscoveryTest extends TestCase {
private static final Log LOG = LogFactory.getLog(MasterSlaveDiscoveryTest.class);
private static final int NUMBER = 10;
private static final String BROKER_A_DIRECTORY = "target/activemq-data/kahadbA";
private static final String BROKER_A1_NAME = "BROKERA1";
private static final String BROKER_A1_BIND_ADDRESS = "tcp://127.0.0.1:61616";
private static final String BROKER_A2_NAME = "BROKERA2";
private static final String BROKER_A2_BIND_ADDRESS = "tcp://127.0.0.1:61617";
private static final String BROKER_B_DIRECTORY = "target/activemq-data/kahadbB";
private static final String BROKER_B1_NAME = "BROKERB1";
private static final String BROKER_B1_BIND_ADDRESS = "tcp://127.0.0.1:61626";
private static final String BROKER_B2_NAME = "BROKERB2";
private static final String BROKER_B2_BIND_ADDRESS = "tcp://127.0.0.1:61627";
private BrokerService brokerA1;
private BrokerService brokerA2;
private BrokerService brokerB1;
private BrokerService brokerB2;
private String clientUrlA;
private String clientUrlB;
public void testNetworkFailback() throws Exception {
final long timeout = 5000; // 5 seconds
final String queueName = getClass().getName();
ActiveMQConnectionFactory factoryA = new ActiveMQConnectionFactory(clientUrlA);
ActiveMQConnection connectionA = (ActiveMQConnection) factoryA.createConnection();
connectionA.start();
Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queueA = sessionA.createQueue(queueName);
MessageProducer producerA = sessionA.createProducer(queueA);
ActiveMQConnectionFactory factoryB = new ActiveMQConnectionFactory(clientUrlB);
ActiveMQConnection connectionB = (ActiveMQConnection) factoryB.createConnection();
connectionB.start();
Session sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queueB = sessionB.createQueue(queueName);
MessageConsumer consumerB = sessionA.createConsumer(queueB);
// Test initial configuration is working
String msgStr = queueName + "-" + System.currentTimeMillis();
Message msgSent = sessionA.createTextMessage(msgStr);
producerA.send(msgSent);
Message msgReceived = null;
try {
msgReceived = consumerB.receive(timeout);
} catch (JMSException e) {
fail("Message Timeout");
}
assertTrue(msgReceived instanceof TextMessage);
assertEquals(((TextMessage) msgReceived).getText(), msgStr);
// Test Failover
assertTrue(brokerB2.isSlave());
brokerB1.stop();
brokerB2.waitUntilStarted();
assertFalse(brokerB2.isSlave());
msgStr = queueName + "-" + System.currentTimeMillis();
msgSent = sessionA.createTextMessage(msgStr);
producerA.send(msgSent);
try {
msgReceived = consumerB.receive(timeout);
} catch (JMSException e) {
fail("Message Timeout");
}
assertTrue(msgReceived instanceof TextMessage);
assertEquals(((TextMessage)msgReceived).getText(), msgStr);
// Test Failback
new Thread(new Runnable() {
@Override
public void run() {
try {
brokerB1.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to start broker");
}
}
}, "BrokerB1 Restarting").start();
brokerB1.waitUntilStarted();
assertTrue(brokerB1.isSlave());
new Thread(new Runnable() {
@Override
public void run() {
try {
brokerB2.stop();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to stop broker");
}
}
}, "BrokerB2 Stopping").start();
brokerB2.waitUntilStopped();
brokerB1.waitUntilStarted();
msgStr = queueName + "-" + System.currentTimeMillis();
msgSent = sessionA.createTextMessage(msgStr);
producerA.send(msgSent);
try {
msgReceived = consumerB.receive(timeout);
} catch (JMSException e) {
fail("Message Timeout");
}
assertTrue(msgReceived instanceof TextMessage);
assertEquals(((TextMessage)msgReceived).getText(), msgStr);
connectionA.close();
connectionB.close();
}
@Override
protected void setUp() throws Exception {
brokerA1 = createBrokerA1();
brokerA1.waitUntilStarted(); // wait to ensure A1 is master
brokerA2 = createBrokerA2();
String connectStringA1 = brokerA1.getTransportConnectors().get(0).getPublishableConnectString();
String connectStringA2 = brokerA2.getTransportConnectors().get(0).getPublishableConnectString();
clientUrlA = "failover:(" + connectStringA1 + "," + connectStringA2 + ")?randomize=false&updateURIsSupported=false";
brokerB1 = createBrokerB1();
brokerB1.waitUntilStarted(); // wait to ensure B1 is master
brokerB2 = createBrokerB2();
String connectStringB1 = brokerB1.getTransportConnectors().get(0).getPublishableConnectString();
String connectStringB2 = brokerB2.getTransportConnectors().get(0).getPublishableConnectString();
clientUrlB = "failover:(" + connectStringB1 + "," + connectStringB2 + ")?randomize=false&updateURIsSupported=false";
}
@Override
protected void tearDown() throws Exception {
if (brokerB2 != null) {
brokerB2.stop();
brokerB2 = null;
}
if (brokerB1 != null) {
brokerB1.stop();
brokerB1 = null;
}
if (brokerA1 != null) {
brokerA1.stop();
brokerA1 = null;
}
if (brokerA2 != null) {
brokerA2.stop();
brokerA2 = null;
}
}
protected BrokerService createBrokerA1() throws Exception {
final BrokerService answer = new BrokerService();
answer.setUseJmx(false);
answer.setBrokerName(BROKER_A1_NAME);
File directory = new File(BROKER_A_DIRECTORY);
IOHelper.deleteChildren(directory);
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(directory);
answer.setPersistent(true);
answer.setPersistenceAdapter(kaha);
NetworkConnector network = answer.addNetworkConnector("masterslave:(" + BROKER_B1_BIND_ADDRESS + "," + BROKER_B2_BIND_ADDRESS + ")?useExponentialBackOff=false&discovered.randomize=true&discovered.maxReconnectAttempts=0");
network.setDuplex(false);
// lazy create
TransportConnector transportConnector = new TransportConnector();
transportConnector.setUri(new URI(BROKER_A1_BIND_ADDRESS));
answer.addConnector(transportConnector);
answer.setUseShutdownHook(false);
answer.start();
return answer;
}
protected BrokerService createBrokerA2() throws Exception {
final BrokerService answer = new BrokerService();
answer.setUseJmx(false);
answer.setBrokerName(BROKER_A2_NAME);
File directory = new File(BROKER_A_DIRECTORY);
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(directory);
answer.setPersistent(true);
answer.setPersistenceAdapter(kaha);
// it is possible to *replace* the default implied failover options via..
NetworkConnector network = answer.addNetworkConnector("masterslave:(" + BROKER_B1_BIND_ADDRESS + "," + BROKER_B2_BIND_ADDRESS + ")");
network.setDuplex(false);
// lazy create
TransportConnector transportConnector = new TransportConnector();
transportConnector.setUri(new URI(BROKER_A2_BIND_ADDRESS));
answer.addConnector(transportConnector);
answer.setUseShutdownHook(false);
new Thread(new Runnable() {
@Override
public void run() {
try {
answer.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to start broker");
}
}
}, "BrokerA2 Starting").start();
return answer;
}
protected BrokerService createBrokerB1() throws Exception {
final BrokerService answer = new BrokerService();
answer.setUseJmx(false);
answer.setBrokerName(BROKER_B1_NAME);
File directory = new File(BROKER_B_DIRECTORY);
IOHelper.deleteChildren(directory);
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(directory);
answer.setPersistent(true);
answer.setPersistenceAdapter(kaha);
NetworkConnector network = answer.addNetworkConnector("masterslave:(" + BROKER_A1_BIND_ADDRESS + "," + BROKER_A2_BIND_ADDRESS + ")");
network.setDuplex(false);
// lazy create
TransportConnector transportConnector = new TransportConnector();
transportConnector.setUri(new URI(BROKER_B1_BIND_ADDRESS));
answer.addConnector(transportConnector);
answer.setUseShutdownHook(false);
answer.start();
return answer;
}
protected BrokerService createBrokerB2() throws Exception {
final BrokerService answer = new BrokerService();
answer.setUseJmx(false);
answer.setBrokerName(BROKER_B2_NAME);
File directory = new File(BROKER_B_DIRECTORY);
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(directory);
answer.setPersistent(true);
answer.setPersistenceAdapter(kaha);
NetworkConnector network = answer.addNetworkConnector("masterslave:(" + BROKER_A1_BIND_ADDRESS + "," + BROKER_A2_BIND_ADDRESS + ")");
network.setDuplex(false);
// lazy create
TransportConnector transportConnector = new TransportConnector();
transportConnector.setUri(new URI(BROKER_B2_BIND_ADDRESS));
answer.addConnector(transportConnector);
answer.setUseShutdownHook(false);
new Thread(new Runnable() {
@Override
public void run() {
try {
answer.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to start broker");
}
}
}, "BrokerB2 Starting").start();
return answer;
}
}

View File

@ -66,7 +66,6 @@ public class ApplicationContextFilter implements Filter {
private String applicationContextName = "applicationContext";
private String requestContextName = "requestContext";
private String requestName = "request";
private final String slavePage = "slave.jsp";
public void init(FilterConfig config) throws ServletException {
this.servletContext = config.getServletContext();
@ -85,19 +84,19 @@ public class ApplicationContextFilter implements Filter {
Map requestContextWrapper = createRequestContextWrapper(request);
String path = ((HttpServletRequest)request).getRequestURI();
// handle slave brokers
try {
if ( !(path.endsWith("css") || path.endsWith("png") || path.endsWith("ico") || path.endsWith(slavePage))
&& ((BrokerFacade)requestContextWrapper.get("brokerQuery")).isSlave()) {
((HttpServletResponse)response).sendRedirect(slavePage);
return;
}
} catch (Exception e) {
LOG.warn(path + ", failed to access BrokerFacade: reason: " + e.getLocalizedMessage());
if (LOG.isDebugEnabled()) {
LOG.debug(request.toString(), e);
}
throw new IOException(e);
}
// try {
// if ( !(path.endsWith("css") || path.endsWith("png") || path.endsWith("ico") || path.endsWith(slavePage))
// && ((BrokerFacade)requestContextWrapper.get("brokerQuery")).isSlave()) {
// ((HttpServletResponse)response).sendRedirect(slavePage);
// return;
// }
// } catch (Exception e) {
// LOG.warn(path + ", failed to access BrokerFacade: reason: " + e.getLocalizedMessage());
// if (LOG.isDebugEnabled()) {
// LOG.debug(request.toString(), e);
// }
// throw new IOException(e);
// }
request.setAttribute(requestContextName, requestContextWrapper);
request.setAttribute(requestName, request);
chain.doFilter(request, response);

View File

@ -209,6 +209,4 @@ public interface BrokerFacade {
boolean isJobSchedulerStarted();
boolean isSlave() throws Exception;
}

View File

@ -226,8 +226,4 @@ public abstract class BrokerFacadeSupport implements BrokerFacade {
return false;
}
}
public boolean isSlave() throws Exception {
return getBrokerAdmin().isSlave();
}
}