Added master/slave functionality to the Broker

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@370223 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-01-18 19:16:58 +00:00
parent 377780985c
commit 4098942c1e
50 changed files with 1878 additions and 114 deletions

View File

@ -41,6 +41,7 @@ import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
@ -77,6 +78,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
protected final TaskRunner taskRunner;
protected final Connector connector;
protected BrokerInfo brokerInfo;
private ConnectionStatistics statistics = new ConnectionStatistics();
private boolean inServiceException=false;
@ -149,6 +151,9 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
} catch (Throwable ignore) {
}
}
if (brokerInfo != null){
broker.removeBroker(this, brokerInfo);
}
}
public void serviceTransportException(IOException e) {
@ -337,8 +342,14 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
broker.acknowledge(lookupConnectionState(ack.getConsumerId()).getContext(), ack);
return null;
}
public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Throwable{
broker.processDispatchNotification(notification);
return null;
}
public Response processBrokerInfo(BrokerInfo info) {
broker.addBroker(this, info);
return null;
}
@ -501,6 +512,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
MessageDispatch md = (MessageDispatch) command;
Runnable sub = (Runnable) md.getConsumer();
broker.processDispatch(md);
try {
dispatch( command );
@ -516,6 +528,10 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public void dispatchAsync(Command message) {
if (message.isMessageDispatch()){
MessageDispatch md = (MessageDispatch) message;
broker.processDispatch(md);
}
if( taskRunner==null ) {
dispatchSync( message );
} else {

View File

@ -20,7 +20,10 @@ import org.apache.activemq.Service;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
@ -33,6 +36,13 @@ import org.apache.activemq.command.TransactionId;
* @version $Revision: 1.8 $
*/
public interface Broker extends Region, Service {
/**
* Get a Broker from the Broker Stack that is a particular class
* @param type
* @return
*/
public Broker getAdaptor(Class type);
/**
* Get the id of the broker
@ -46,6 +56,22 @@ public interface Broker extends Region, Service {
* Get the name of the broker
*/
public String getBrokerName();
/**
* A remote Broker connects
* @param contection
* @param info
* @param client
*/
public void addBroker(Connection connection, BrokerInfo info);
/**
* Remove a BrokerInfo
* @param connection
* @param info
*/
public void removeBroker(Connection connection,BrokerInfo info);
/**
* A client is establishing a connection with the broker.
@ -150,4 +176,30 @@ public interface Broker extends Region, Service {
*/
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Throwable;
/**
* Get the BrokerInfo's of any connected Brokers
* @return array of peer BrokerInfos
*/
BrokerInfo[] getPeerBrokerInfos();
/**
* Notify the Broker that a dispatch has happened
* @param messageDispatch
*/
public void processDispatch(MessageDispatch messageDispatch);
/**
* Notify the Broker of a MessageDispatchNotification
* @param messageDispatchNotification
* @throws Throwable
*/
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable;
/**
*
* @return true if the broker is running as a slave
*/
public boolean isSlaveBroker();
}

View File

@ -17,6 +17,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@ -31,7 +32,7 @@ import org.apache.activemq.command.TransactionId;
* @version $Revision: 1.10 $
*/
public class BrokerBroadcaster extends BrokerFilter{
protected transient volatile Broker[] listeners=new Broker[0];
protected volatile Broker[] listeners=new Broker[0];
public BrokerBroadcaster(Broker next){
super(next);
@ -202,30 +203,39 @@ public class BrokerBroadcaster extends BrokerFilter{
}
public void gc(){
next.gc();
Broker brokers[]=getListeners();
for(int i=0;i<brokers.length;i++){
brokers[i].gc();
}
next.gc();
}
public void addBroker(Connection connection,BrokerInfo info){
next.addBroker(connection,info);
Broker brokers[]=getListeners();
for(int i=0;i<brokers.length;i++){
brokers[i].addBroker(connection, info);
}
}
protected Broker[] getListeners(){
return listeners;
}
public synchronized void addInteceptor(Broker broker){
List tmp=getInterceptorsAsList();
public synchronized void addListener(Broker broker){
List tmp=getListenersAsList();
tmp.add(broker);
listeners=(Broker[]) tmp.toArray(new Broker[tmp.size()]);
}
public synchronized void removeInterceptor(Broker broker){
List tmp=getInterceptorsAsList();
public synchronized void removeListener(Broker broker){
List tmp=getListenersAsList();
tmp.remove(broker);
listeners=(Broker[]) tmp.toArray(new Broker[tmp.size()]);
}
protected List getInterceptorsAsList(){
protected List getListenersAsList(){
List tmp=new ArrayList();
Broker brokers[]=getListeners();
for(int i=0;i<brokers.length;i++){

View File

@ -19,10 +19,13 @@ package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionInfo;
@ -42,6 +45,15 @@ public class BrokerFilter implements Broker {
this.next=next;
}
public Broker getAdaptor(Class type){
if (type.isInstance(this)){
return this;
}
return next.getAdaptor(type);
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable {
next.acknowledge(context, ack);
}
@ -146,4 +158,30 @@ public class BrokerFilter implements Broker {
next.gc();
}
public void addBroker(Connection connection,BrokerInfo info){
next.addBroker(connection, info);
}
public void removeBroker(Connection connection,BrokerInfo info){
next.removeBroker(connection, info);
}
public BrokerInfo[] getPeerBrokerInfos(){
return next.getPeerBrokerInfos();
}
public void processDispatch(MessageDispatch messageDispatch){
next.processDispatch(messageDispatch);
}
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable{
next.processDispatchNotification(messageDispatchNotification);
}
public boolean isSlaveBroker(){
return next.isSlaveBroker();
}
}

View File

@ -16,27 +16,19 @@
*/
package org.apache.activemq.broker;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.ft.MasterConnector;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ConnectorView;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.FTConnectorView;
import org.apache.activemq.broker.jmx.JmsConnectorView;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagedTransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
@ -62,8 +54,19 @@ import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* Represents a running broker service which consists of a number of transport
@ -82,6 +85,7 @@ public class BrokerService implements Service {
private boolean populateJMSXUserID = false;
private boolean useShutdownHook = true;
private boolean useLoggingForShutdownErrors = false;
private boolean shutdownOnMasterFailure = false;
private String brokerName = "localhost";
private File dataDirectory;
private Broker broker;
@ -96,10 +100,12 @@ public class BrokerService implements Service {
private List proxyConnectors = new CopyOnWriteArrayList();
private List registeredMBeanNames = new CopyOnWriteArrayList();
private List jmsConnectors = new CopyOnWriteArrayList();
private MasterConnector masterConnector;
private Thread shutdownHook;
private String[] transportConnectorURIs;
private String[] networkConnectorURIs;
private String[] proxyConnectorURIs;
private String masterConnectorURI;
private JmsConnector[] jmsBridgeConnectors; //these are Jms to Jms bridges to other jms messaging systems
private boolean deleteAllMessagesOnStartup;
private URI vmConnectorURI;
@ -143,6 +149,7 @@ public class BrokerService implements Service {
* @throws Exception
*/
public TransportConnector addConnector(TransportConnector connector) throws Exception {
int what = System.identityHashCode(connector);
if (isUseJmx()) {
URI discoveryUri = connector.getDiscoveryUri();
connector = connector.asManagedConnector(getManagementContext().getMBeanServer(), getBrokerObjectName());
@ -230,9 +237,12 @@ public class BrokerService implements Service {
return connector;
}
public JmsConnector addJmsConnector(JmsConnector connector){
public JmsConnector addJmsConnector(JmsConnector connector) throws Exception{
connector.setBrokerService(this);
jmsConnectors.add(connector);
if (isUseJmx()) {
registerJmsConnectorMBean(connector);
}
return connector;
}
@ -243,6 +253,64 @@ public class BrokerService implements Service {
return null;
}
public void initializeMasterConnector(URI remoteURI) throws Exception {
if (masterConnector != null){
throw new IllegalStateException("Can only be the Slave to one Master");
}
URI localURI = getVmConnectorURI();
TransportConnector connector = null;
if (!transportConnectors.isEmpty()){
connector = (TransportConnector)transportConnectors.get(0);
}
masterConnector = new MasterConnector(this,connector);
masterConnector.setLocalURI(localURI);
masterConnector.setRemoteURI(remoteURI);
if (isUseJmx()) {
registerFTConnectorMBean(masterConnector);
}
}
/**
* @return Returns the masterConnectorURI.
*/
public String getMasterConnectorURI(){
return masterConnectorURI;
}
/**
* @param masterConnectorURI The masterConnectorURI to set.
*/
public void setMasterConnectorURI(String masterConnectorURI){
this.masterConnectorURI=masterConnectorURI;
}
/**
* @return true if this Broker is a slave to a Master
*/
public boolean isSlave(){
return masterConnector != null && masterConnector.isSlave();
}
public void masterFailed(){
if (shutdownOnMasterFailure){
log.fatal("The Master has failed ... shutting down");
try {
stop();
}catch(Exception e){
log.error("Failed to stop for master failure",e);
}
}else {
log.warn("Master Failed - starting all connectors");
try{
startAllConnectors();
}catch(Exception e){
log.error("Failed to startAllConnectors");
}
}
}
// Service interface
// -------------------------------------------------------------------------
public void start() throws Exception {
@ -269,26 +337,15 @@ public class BrokerService implements Service {
}
getBroker().start();
for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = (TransportConnector) iter.next();
connector.start();
}
for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = (NetworkConnector) iter.next();
connector.start();
if (masterConnectorURI!=null){
initializeMasterConnector(new URI(masterConnectorURI));
if (masterConnector!=null){
masterConnector.start();
}
}
for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) {
ProxyConnector connector = (ProxyConnector) iter.next();
connector.start();
}
startAllConnectors();
for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) {
JmsConnector connector = (JmsConnector) iter.next();
connector.start();
}
log.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ") started");
}
@ -303,8 +360,12 @@ public class BrokerService implements Service {
removeShutdownHook();
ServiceStopper stopper = new ServiceStopper();
if (masterConnector != null){
masterConnector.stop();
}
for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = (TransportConnector) iter.next();
stopper.stop(connector);
}
@ -630,8 +691,8 @@ public class BrokerService implements Service {
}
}
if (networkConnectorURIs != null) {
for (int i = 0; i < transportConnectorURIs.length; i++) {
String uri = transportConnectorURIs[i];
for (int i = 0; i < networkConnectorURIs.length; i++) {
String uri = networkConnectorURIs[i];
addNetworkConnector(uri);
}
}
@ -641,11 +702,13 @@ public class BrokerService implements Service {
addProxyConnector(uri);
}
}
if (jmsBridgeConnectors != null){
for (int i = 0; i < jmsBridgeConnectors.length; i++){
addJmsConnector(jmsBridgeConnectors[i]);
}
}
}
protected void registerConnectorMBean(TransportConnector connector) throws IOException, URISyntaxException {
@ -701,6 +764,42 @@ public class BrokerService implements Service {
}
}
protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
FTConnectorView view = new FTConnectorView(connector);
Hashtable map = new Hashtable();
map.put("Type", "MasterConnector");
map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName()));
// map.put("ConnectorName",
// JMXSupport.encodeObjectNamePart(connector.()));
try {
ObjectName objectName = new ObjectName("org.apache.activemq", map);
mbeanServer.registerMBean(view, objectName);
registeredMBeanNames.add(objectName);
}
catch (Throwable e) {
throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
}
}
protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
JmsConnectorView view = new JmsConnectorView(connector);
Hashtable map = new Hashtable();
map.put("Type", "JmsConnector");
map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName()));
// map.put("ConnectorName",
// JMXSupport.encodeObjectNamePart(connector.()));
try {
ObjectName objectName = new ObjectName("org.apache.activemq", map);
mbeanServer.registerMBean(view, objectName);
registeredMBeanNames.add(objectName);
}
catch (Throwable e) {
throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
}
}
/**
* Factory method to create a new broker
*
@ -733,6 +832,7 @@ public class BrokerService implements Service {
mbeanServer.registerMBean(view, objectName);
registeredMBeanNames.add(objectName);
}
return broker;
@ -751,11 +851,11 @@ public class BrokerService implements Service {
RegionBroker regionBroker = null;
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
regionBroker = new ManagedRegionBroker(mbeanServer, getBrokerObjectName(),
regionBroker = new ManagedRegionBroker(this,mbeanServer, getBrokerObjectName(),
getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter(), getDestinationPolicy());
}
else {
regionBroker = new RegionBroker(getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter(),
regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter(),
getDestinationPolicy());
}
regionBroker.setBrokerName(getBrokerName());
@ -884,6 +984,34 @@ public class BrokerService implements Service {
System.err.println("Failed to shut down: " + e);
}
}
/**
* Start all transport and network connections, proxies and bridges
* @throws Exception
*/
protected void startAllConnectors() throws Exception{
if (!isSlave()){
for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = (TransportConnector) iter.next();
connector.start();
}
for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = (NetworkConnector) iter.next();
connector.start();
}
for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) {
ProxyConnector connector = (ProxyConnector) iter.next();
connector.start();
}
for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) {
JmsConnector connector = (JmsConnector) iter.next();
connector.start();
}
}
}
public boolean isDeleteAllMessagesOnStartup() {
return deleteAllMessagesOnStartup;
@ -911,4 +1039,18 @@ public class BrokerService implements Service {
public void setVmConnectorURI(URI vmConnectorURI) {
this.vmConnectorURI = vmConnectorURI;
}
/**
* @return Returns the shutdownOnMasterFailure.
*/
public boolean isShutdownOnMasterFailure(){
return shutdownOnMasterFailure;
}
/**
* @param shutdownOnMasterFailure The shutdownOnMasterFailure to set.
*/
public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){
this.shutdownOnMasterFailure=shutdownOnMasterFailure;
}
}

View File

@ -0,0 +1,185 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
/**
* Dumb implementation - used to be overriden by listeners
*
* @version $Revision$
*/
public class EmptyBroker implements Broker{
public BrokerId getBrokerId(){
return null;
}
public String getBrokerName(){
return null;
}
public Broker getAdaptor(Class type){
if (type.isInstance(this)){
return this;
}
return null;
}
public void addConnection(ConnectionContext context,ConnectionInfo info) throws Throwable{
}
public void removeConnection(ConnectionContext context,ConnectionInfo info,Throwable error) throws Throwable{
}
public void addSession(ConnectionContext context,SessionInfo info) throws Throwable{
}
public void removeSession(ConnectionContext context,SessionInfo info) throws Throwable{
}
public void addProducer(ConnectionContext context,ProducerInfo info) throws Throwable{
}
public void removeProducer(ConnectionContext context,ProducerInfo info) throws Throwable{
}
public Connection[] getClients() throws Throwable{
return null;
}
public ActiveMQDestination[] getDestinations() throws Throwable{
return null;
}
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Throwable{
return null;
}
public void beginTransaction(ConnectionContext context,TransactionId xid) throws Throwable{
}
public int prepareTransaction(ConnectionContext context,TransactionId xid) throws Throwable{
return 0;
}
public void rollbackTransaction(ConnectionContext context,TransactionId xid) throws Throwable{
}
public void commitTransaction(ConnectionContext context,TransactionId xid,boolean onePhase) throws Throwable{
}
public void forgetTransaction(ConnectionContext context,TransactionId transactionId) throws Throwable{
}
public Destination addDestination(ConnectionContext context,ActiveMQDestination destination) throws Throwable{
return null;
}
public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) throws Throwable{
}
public void addConsumer(ConnectionContext context,ConsumerInfo info) throws Throwable{
}
public void removeConsumer(ConnectionContext context,ConsumerInfo info) throws Throwable{
}
public void removeSubscription(ConnectionContext context,RemoveSubscriptionInfo info) throws Throwable{
}
public void send(ConnectionContext context,Message message) throws Throwable{
}
public void acknowledge(ConnectionContext context,MessageAck ack) throws Throwable{
}
public void gc(){
}
public void start() throws Exception{
}
public void stop() throws Exception{
}
public void addBroker(Connection connection,BrokerInfo info){
}
public void removeBroker(Connection connection,BrokerInfo info){
}
public BrokerInfo[] getPeerBrokerInfos(){
return null;
}
/**
* Notifiy the Broker that a dispatch has happened
* @param messageDispatch
*/
public void processDispatch(MessageDispatch messageDispatch){
}
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification){
}
public boolean isSlaveBroker(){
return false;
}
}

View File

@ -19,10 +19,13 @@ package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionInfo;
@ -41,6 +44,13 @@ public class ErrorBroker implements Broker {
this.message=message;
}
public Broker getAdaptor(Class type){
if (type.isInstance(this)){
return this;
}
return null;
}
public BrokerId getBrokerId() {
throw new IllegalStateException(this.message);
}
@ -144,4 +154,31 @@ public class ErrorBroker implements Broker {
public void stop() throws Exception {
throw new IllegalStateException(this.message);
}
public void addBroker(Connection connection,BrokerInfo info){
throw new IllegalStateException(this.message);
}
public void removeBroker(Connection connection,BrokerInfo info){
throw new IllegalStateException(this.message);
}
public BrokerInfo[] getPeerBrokerInfos(){
throw new IllegalStateException(this.message);
}
public void processDispatch(MessageDispatch messageDispatch){
throw new IllegalStateException(this.message);
}
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification){
throw new IllegalStateException(this.message);
}
public boolean isSlaveBroker(){
throw new IllegalStateException(this.message);
}
}

View File

@ -0,0 +1,41 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
/**
* Inserts itself into the BrokerStack
*
* @version $Revision: 1.10 $
*/
public class InsertableMutableBrokerFilter extends MutableBrokerFilter{
MutableBrokerFilter parent;
public InsertableMutableBrokerFilter(MutableBrokerFilter parent){
super(parent.getNext());
this.parent=parent;
parent.setNext(this);
}
/**
* Remove 'self' from the BrokerStack
*/
public void remove(){
parent.setNext(getNext());
}
}

View File

@ -19,10 +19,13 @@ package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionInfo;
@ -44,6 +47,13 @@ public class MutableBrokerFilter implements Broker {
this.next = next;
}
public Broker getAdaptor(Class type){
if (type.isInstance(this)){
return this;
}
return next.getAdaptor(type);
}
public Broker getNext() {
synchronized(mutext) {
return next;
@ -160,4 +170,28 @@ public class MutableBrokerFilter implements Broker {
getNext().gc();
}
public void addBroker(Connection connection,BrokerInfo info){
getNext().addBroker(connection, info);
}
public void removeBroker(Connection connection,BrokerInfo info){
getNext().removeBroker(connection, info);
}
public BrokerInfo[] getPeerBrokerInfos(){
return getNext().getPeerBrokerInfos();
}
public void processDispatch(MessageDispatch messageDispatch){
getNext().processDispatch(messageDispatch);
}
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable{
getNext().processDispatchNotification(messageDispatchNotification);
}
public boolean isSlaveBroker(){
return getNext().isSlaveBroker();
}
}

View File

@ -18,19 +18,23 @@ package org.apache.activemq.broker;
import java.io.IOException;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
*
* @version $Revision: 1.8 $
*/
public class TransportConnection extends AbstractConnection {
private static final Log log = LogFactory.getLog(TransportConnection.class);
private final Transport transport;
private boolean slow;
private boolean markedCandidate;
@ -39,6 +43,7 @@ public class TransportConnection extends AbstractConnection {
private boolean connected;
private boolean active;
private long timeStamp=0;
private MasterBroker masterBroker; //used if this connection is used by a Slave
/**
* @param connector
@ -71,10 +76,14 @@ public class TransportConnection extends AbstractConnection {
}
public void stop() throws Exception {
try {
if (masterBroker != null){
masterBroker.stop();
}
transport.oneway(new ShutdownInfo());
} catch (IOException ignore) {
Thread.sleep(1000);
} catch (Exception ignore) {
//ignore.printStackTrace();
}
transport.stop();
@ -181,6 +190,19 @@ public class TransportConnection extends AbstractConnection {
this.active=active;
}
public Response processBrokerInfo(BrokerInfo info) {
if (info.isSlaveBroker()){
//stream messages from this broker (the master) to
//the slave
MutableBrokerFilter parent = (MutableBrokerFilter)broker.getAdaptor(MutableBrokerFilter.class);
masterBroker = new MasterBroker(parent,transport);
masterBroker.startProcessing();
log.info("Slave Broker " + info.getBrokerName() + " is attached");
}
return super.processBrokerInfo(info);
}
protected void dispatch(Command command){

View File

@ -113,6 +113,7 @@ public class TransportConnector implements Connector {
public void setBroker(Broker broker) {
this.broker = broker;
brokerInfo.setBrokerId(broker.getBrokerId());
brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
}
public void setBrokerName(String brokerName) {
@ -121,6 +122,7 @@ public class TransportConnector implements Connector {
public void setServer(TransportServer server) {
this.server = server;
this.brokerInfo.setBrokerURL(server.getConnectURI().toString());
this.server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(Transport transport) {
try {

View File

@ -0,0 +1,256 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.InsertableMutableBrokerFilter;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.transport.Transport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* The Message Broker which passes messages to a slave
*
* @version $Revision: 1.8 $
*/
public class MasterBroker extends InsertableMutableBrokerFilter{
private static final Log log=LogFactory.getLog(MasterBroker.class);
private Transport slave;
private AtomicBoolean started=new AtomicBoolean(false);
public MasterBroker(MutableBrokerFilter parent,Transport slave){
super(parent);
this.slave=slave;
}
public void startProcessing(){
started.set(true);
}
public void stop() throws Exception{
super.stop();
stopProcessing();
}
public void stopProcessing(){
if (started.compareAndSet(true,false)){
remove();
}
}
/**
* A client is establishing a connection with the broker.
* @param context
* @param info
* @param client
*/
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Throwable{
super.addConnection(context,info);
sendAsyncToSlave(info);
}
/**
* A client is disconnecting from the broker.
* @param context the environment the operation is being executed under.
* @param info
* @param client
* @param error null if the client requested the disconnect or the error that caused the client to disconnect.
*/
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Throwable{
super.removeConnection(context,info,error);
sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
}
/**
* Adds a session.
* @param context
* @param info
* @throws Throwable
*/
public void addSession(ConnectionContext context, SessionInfo info) throws Throwable{
super.addSession(context, info);
sendAsyncToSlave(info);
}
/**
* Removes a session.
* @param context
* @param info
* @throws Throwable
*/
public void removeSession(ConnectionContext context, SessionInfo info) throws Throwable{
super.removeSession(context, info);
sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
}
/**
* Adds a producer.
* @param context the enviorment the operation is being executed under.
*/
public void addProducer(ConnectionContext context, ProducerInfo info) throws Throwable{
super.addProducer(context,info);
sendAsyncToSlave(info);
}
/**
* Removes a producer.
* @param context the enviorment the operation is being executed under.
*/
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Throwable{
super.removeProducer(context, info);
sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
}
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Throwable{
super.beginTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN);
sendAsyncToSlave(info);
}
/**
* Prepares a transaction. Only valid for xa transactions.
* @param client
* @param xid
* @return
*/
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Throwable{
int result = super.prepareTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE);
sendAsyncToSlave(info);
return result;
}
/**
* Rollsback a transaction.
* @param client
* @param xid
*/
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Throwable{
super.rollbackTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
sendAsyncToSlave(info);
}
/**
* Commits a transaction.
* @param client
* @param xid
* @param onePhase
*/
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Throwable{
super.commitTransaction(context, xid,onePhase);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
sendAsyncToSlave(info);
}
/**
* Forgets a transaction.
* @param client
* @param xid
* @param onePhase
* @throws Throwable
*/
public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Throwable{
super.forgetTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET);
sendAsyncToSlave(info);
}
/**
* Notifiy the Broker that a dispatch has happened
* @param messageDispatch
*/
public void processDispatch(MessageDispatch messageDispatch){
super.processDispatch(messageDispatch);
MessageDispatchNotification mdn = new MessageDispatchNotification();
mdn.setConsumerId(messageDispatch.getConsumerId());
mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
mdn.setDestination(messageDispatch.getDestination());
mdn.setMessageId(messageDispatch.getMessage().getMessageId());
sendAsyncToSlave(mdn);
}
public void send(ConnectionContext context, Message message) throws Throwable{
super.send(context,message);
sendAsyncToSlave(message);
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable{
super.acknowledge(context, ack);
sendAsyncToSlave(ack);
}
protected void sendToSlave(Message message){
/*
if (message.isPersistent()){
sendSyncToSlave(message);
}else{
sendAsyncToSlave(message);
}
*/
sendAsyncToSlave(message);
}
protected void sendAsyncToSlave(Command command){
try{
slave.oneway(command);
}catch(Throwable e){
log.error("Slave Failed",e);
stopProcessing();
}
}
protected void sendSyncToSlave(Command command){
try{
Response response=slave.request(command);
if (response.isException()){
ExceptionResponse er=(ExceptionResponse)response;
log.error("Slave Failed",er.getException());
}
}catch(Throwable e){
log.error("Slave Failed",e);
}
}
}

View File

@ -0,0 +1,229 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.net.URI;
/**
* Used by a Slave Broker to Connect to the Master
*
* @version $Revision$
*/
public class MasterConnector implements Service{
private static final Log log=LogFactory.getLog(MasterConnector.class);
private BrokerService broker;
private URI remoteURI;
private URI localURI;
private Transport localBroker;
private Transport remoteBroker;
private TransportConnector connector;
private AtomicBoolean masterActive = new AtomicBoolean(false);
IdGenerator idGenerator=new IdGenerator();
ConnectionInfo connectionInfo;
SessionInfo sessionInfo;
ProducerInfo producerInfo;
public MasterConnector(BrokerService broker,TransportConnector connector){
this.broker = broker;
this.connector = connector;
}
public boolean isSlave(){
return masterActive.get();
}
public void start() throws Exception{
localBroker=TransportFactory.connect(localURI);
remoteBroker=TransportFactory.connect(remoteURI);
log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established.");
localBroker.setTransportListener(new TransportListener(){
public void onCommand(Command command){
}
public void onException(IOException error){
serviceLocalException(error);
}
});
remoteBroker.setTransportListener(new TransportListener(){
public void onCommand(Command command){
serviceRemoteCommand(command);
}
public void onException(IOException error){
serviceRemoteException(error);
}
});
masterActive.set(true);
Thread thead=new Thread(){
public void run(){
try{
localBroker.start();
remoteBroker.start();
startBridge();
}catch(Exception e){
masterActive.set(false);
log.error("Failed to start network bridge: "+e,e);
}
}
};
thead.start();
}
protected void startBridge() throws Exception{
connectionInfo=new ConnectionInfo();
connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
connectionInfo.setClientId(idGenerator.generateId());
localBroker.oneway(connectionInfo);
remoteBroker.oneway(connectionInfo);
sessionInfo=new SessionInfo(connectionInfo,1);
localBroker.oneway(sessionInfo);
remoteBroker.oneway(sessionInfo);
producerInfo=new ProducerInfo(sessionInfo,1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
BrokerInfo brokerInfo = null;
if (connector != null){
brokerInfo = connector.getBrokerInfo();
}else{
brokerInfo = new BrokerInfo();
}
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
brokerInfo.setSlaveBroker(true);
remoteBroker.oneway(brokerInfo);
log.info("Slave connection between "+localBroker+" and "+remoteBroker+" has been established.");
}
public void stop() throws Exception{
masterActive.set(false);
try{
if (connectionInfo!=null){
localBroker.request(connectionInfo.createRemoveCommand());
}
localBroker.setTransportListener(null);
remoteBroker.setTransportListener(null);
remoteBroker.oneway(new ShutdownInfo());
localBroker.oneway(new ShutdownInfo());
}catch(IOException e){
log.debug("Caught exception stopping",e);
}finally{
ServiceStopper ss=new ServiceStopper();
ss.stop(localBroker);
ss.stop(remoteBroker);
ss.throwFirstException();
}
}
protected void serviceRemoteException(IOException error){
log.error("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
shutDown();
}
protected void serviceRemoteCommand(Command command){
try{
if (command.isMessageDispatch()){
MessageDispatch md=(MessageDispatch)command;
command=md.getMessage();
}
if (command.getDataStructureType()==CommandTypes.SHUTDOWN_INFO){
log.warn("The Master has shutdown");
shutDown();
}else {
localBroker.oneway(command);
}
}catch(IOException e){
serviceRemoteException(e);
}
}
protected void serviceLocalException(Throwable error){
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
ServiceSupport.dispose(this);
}
/**
* @return Returns the localURI.
*/
public URI getLocalURI(){
return localURI;
}
/**
* @param localURI
* The localURI to set.
*/
public void setLocalURI(URI localURI){
this.localURI=localURI;
}
/**
* @return Returns the remoteURI.
*/
public URI getRemoteURI(){
return remoteURI;
}
/**
* @param remoteURI
* The remoteURI to set.
*/
public void setRemoteURI(URI remoteURI){
this.remoteURI=remoteURI;
}
private void shutDown(){
masterActive.set(false);
broker.masterFailed();
ServiceSupport.dispose(this);
}
}

View File

@ -0,0 +1,36 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.ft.MasterConnector;
public class FTConnectorView implements FTConnectorViewMBean {
private final MasterConnector connector;
public FTConnectorView(MasterConnector connector) {
this.connector = connector;
}
public void start() throws Exception {
connector.start();
}
public void stop() throws Exception {
connector.stop();
}
}

View File

@ -0,0 +1,23 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.Service;
public interface FTConnectorViewMBean extends Service {
}

View File

@ -0,0 +1,36 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.network.jms.JmsConnector;
public class JmsConnectorView implements JmsConnectorViewMBean {
private final JmsConnector connector;
public JmsConnectorView(JmsConnector connector) {
this.connector = connector;
}
public void start() throws Exception {
connector.start();
}
public void stop() throws Exception {
connector.stop();
}
}

View File

@ -0,0 +1,23 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.Service;
public interface JmsConnectorViewMBean extends Service {
}

View File

@ -31,7 +31,7 @@ public class ManagedQueueRegion extends QueueRegion {
private final ManagedRegionBroker regionBroker;
public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter, PolicyMap policyMap) {
super(destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter, policyMap);
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter, policyMap);
regionBroker = broker;
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
@ -37,8 +38,8 @@ public class ManagedRegionBroker extends RegionBroker {
private final MBeanServer mbeanServer;
private final ObjectName brokerObjectName;
public ManagedRegionBroker(MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, PolicyMap policyMap) throws IOException {
super(taskRunnerFactory, memoryManager, adapter, policyMap);
public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, PolicyMap policyMap) throws IOException {
super(brokerService,taskRunnerFactory, memoryManager, adapter, policyMap);
this.mbeanServer = mbeanServer;
this.brokerObjectName = brokerObjectName;
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
@ -30,7 +31,7 @@ public class ManagedTempQueueRegion extends TempQueueRegion {
public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
super(destinationStatistics, memoryManager, taskRunnerFactory);
super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory);
this.regionBroker = regionBroker;
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
@ -29,7 +30,7 @@ public class ManagedTempTopicRegion extends TempTopicRegion {
private final ManagedRegionBroker regionBroker;
public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
super(destinationStatistics, memoryManager, taskRunnerFactory);
super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory);
this.regionBroker = regionBroker;
}

View File

@ -31,7 +31,7 @@ public class ManagedTopicRegion extends TopicRegion {
private final ManagedRegionBroker regionBroker;
public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter, PolicyMap policyMap) {
super(destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter, policyMap);
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter, policyMap);
regionBroker = broker;
}

View File

@ -21,11 +21,13 @@ import java.util.Set;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.memory.UsageManager;
@ -50,11 +52,13 @@ abstract public class AbstractRegion implements Region {
protected final UsageManager memoryManager;
protected final PersistenceAdapter persistenceAdapter;
protected final DestinationStatistics destinationStatistics;
protected final Broker broker;
protected boolean autoCreateDestinations=true;
protected final TaskRunnerFactory taskRunnerFactory;
protected final Object destinationsMutex = new Object();
public AbstractRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
public AbstractRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
this.broker = broker;
this.destinationStatistics = destinationStatistics;
this.memoryManager = memoryManager;
this.taskRunnerFactory = taskRunnerFactory;
@ -206,6 +210,12 @@ abstract public class AbstractRegion implements Region {
}
}
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable{
Subscription sub = (Subscription) subscriptions.get(messageDispatchNotification.getConsumerId());
if (sub != null){
sub.processMessageDispatchNotification(messageDispatchNotification);
}
}
public void gc() {
for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next();

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
@ -38,6 +39,7 @@ abstract public class AbstractSubscription implements Subscription {
static private final Log log = LogFactory.getLog(AbstractSubscription.class);
protected Broker broker;
protected ConnectionContext context;
protected ConsumerInfo info;
final protected DestinationFilter destinationFilter;
@ -45,7 +47,8 @@ abstract public class AbstractSubscription implements Subscription {
final protected CopyOnWriteArrayList destinations = new CopyOnWriteArrayList();
public AbstractSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this.broker = broker;
this.context = context;
this.info = info;
this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
@ -106,4 +109,8 @@ abstract public class AbstractSubscription implements Subscription {
public void gc() {
}
public boolean isSlaveBroker(){
return broker.isSlaveBroker();
}
}

View File

@ -21,6 +21,7 @@ import java.util.Iterator;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@ -39,14 +40,14 @@ public class DurableTopicSubscription extends PrefetchSubscription {
boolean active=true;
boolean recovered=true;
public DurableTopicSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(context, info);
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker,context, info);
this.clientId = context.getClientId();
this.subscriptionName = info.getSubcriptionName();
}
public DurableTopicSubscription(SubscriptionInfo info) throws InvalidSelectorException {
super(null, createFakeConsumerInfo(info));
public DurableTopicSubscription(Broker broker,SubscriptionInfo info) throws InvalidSelectorException {
super(broker,null, createFakeConsumerInfo(info));
this.clientId = info.getClientId();
this.subscriptionName = info.getSubcriptionName();
active=false;

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
@ -23,6 +24,7 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
@ -52,15 +54,39 @@ abstract public class PrefetchSubscription extends AbstractSubscription {
int preLoadSize=0;
boolean dispatching=false;
public PrefetchSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(context, info);
public PrefetchSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker,context, info);
}
synchronized public void add(MessageReference node) throws Throwable {
if( !isFull() ) {
if( !isFull() && !isSlaveBroker()) {
dispatch(node);
} else {
matched.addLast(node);
synchronized(matched){
matched.addLast(node);
}
}
}
public void processMessageDispatchNotification(MessageDispatchNotification mdn){
synchronized(matched){
for (Iterator i = matched.iterator(); i.hasNext();){
MessageReference node = (MessageReference)i.next();
if (node.getMessageId().equals(mdn.getMessageId())){
i.remove();
try {
MessageDispatch md = createMessageDispatch(node, node.getMessage());
dispatched.addLast(node);
incrementPreloadSize(node.getMessage().getSize());
node.decrementReferenceCount();
}catch(Exception e){
log.error("Problem processing MessageDispatchNotification: " + mdn,e);
}
break;
}
}
}
}
@ -244,6 +270,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription {
}
}
}
private void dispatch(final MessageReference node) throws IOException {
node.incrementReferenceCount();
@ -254,7 +282,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription {
}
// Make sure we can dispatch a message.
if( canDispatch(node) ) {
if( canDispatch(node) && !isSlaveBroker()) {
MessageDispatch md = createMessageDispatch(node, message);
dispatched.addLast(node);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@ -28,8 +29,8 @@ public class QueueBrowserSubscription extends PrefetchSubscription {
boolean browseDone;
public QueueBrowserSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(context, info);
public QueueBrowserSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker,context, info);
}
protected boolean canDispatch(MessageReference node) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@ -40,9 +41,9 @@ public class QueueRegion extends AbstractRegion {
private final PolicyMap policyMap;
public QueueRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
public QueueRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter persistenceAdapter, PolicyMap policyMap) {
super(destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
this.policyMap = policyMap;
}
@ -71,10 +72,10 @@ public class QueueRegion extends AbstractRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
if (info.isBrowser()) {
return new QueueBrowserSubscription(context, info);
return new QueueBrowserSubscription(broker,context, info);
}
else {
return new QueueSubscription(context, info);
return new QueueSubscription(broker,context, info);
}
}

View File

@ -16,10 +16,7 @@
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.command.ConsumerId;
@ -27,12 +24,14 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import javax.jms.InvalidSelectorException;
import java.io.IOException;
public class QueueSubscription extends PrefetchSubscription {
public QueueSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(context, info);
public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker,context, info);
}
public void add(MessageReference node) throws Throwable {

View File

@ -22,6 +22,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.RemoveSubscriptionInfo;
/**
@ -86,6 +87,13 @@ public interface Region extends Service {
* @param context the environment the operation is being executed under.
*/
public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable;
/**
* Process a notification of a dispatch - used by a Slave Broker
* @param messageDispatchNotification
* @throws Throwable
*/
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable;
public void gc();

View File

@ -20,15 +20,19 @@ import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionInfo;
@ -46,8 +50,8 @@ import javax.jms.JMSException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.*;
import java.util.Set;
import java.util.HashMap;
import java.util.Map;
/**
* Routes Broker operations to the correct messaging regions for processing.
@ -62,22 +66,25 @@ public class RegionBroker implements Broker {
private final Region topicRegion;
private final Region tempQueueRegion;
private final Region tempTopicRegion;
private BrokerService brokerService;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
private final CopyOnWriteArraySet destinations = new CopyOnWriteArraySet();
private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList();
private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
private BrokerId brokerId;
private String brokerName;
private Map clientIdSet = new HashMap(); // we will synchronize access
public RegionBroker(TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException {
this(taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null);
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException {
this(brokerService,taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null);
}
public RegionBroker(TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, PolicyMap policyMap) throws IOException {
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, PolicyMap policyMap) throws IOException {
this.brokerService = brokerService;
this.sequenceGenerator.setLastSequenceId( adapter.getLastMessageBrokerSequenceId() );
queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, adapter, policyMap);
@ -86,21 +93,28 @@ public class RegionBroker implements Broker {
tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory);
tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory);
}
public Broker getAdaptor(Class type){
if (type.isInstance(this)){
return this;
}
return null;
}
protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
return new TempTopicRegion(destinationStatistics, memoryManager, taskRunnerFactory);
return new TempTopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory);
}
protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
return new TempQueueRegion(destinationStatistics, memoryManager, taskRunnerFactory);
return new TempQueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory);
}
protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter, PolicyMap policyMap) {
return new TopicRegion(destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap);
return new TopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap);
}
protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter, PolicyMap policyMap) {
return new QueueRegion(destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap);
return new QueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap);
}
private static PersistenceAdapter createDefaultPersistenceAdapter(UsageManager memoryManager) throws IOException {
@ -279,7 +293,6 @@ public class RegionBroker implements Broker {
}
public void send(ConnectionContext context, Message message) throws Throwable {
message.getMessageId().setBrokerSequenceId(sequenceGenerator.getNextSequenceId());
ActiveMQDestination destination = message.getDestination();
switch(destination.getDestinationType()) {
@ -386,4 +399,50 @@ public class RegionBroker implements Broker {
throw new JMSException("Unknown destination type: " + destination.getDestinationType());
}
public synchronized void addBroker(Connection connection,BrokerInfo info){
brokerInfos.add(info);
}
public synchronized void removeBroker(Connection connection,BrokerInfo info){
if (info != null){
brokerInfos.remove(info);
}
}
public synchronized BrokerInfo[] getPeerBrokerInfos(){
BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
result = (BrokerInfo[])brokerInfos.toArray(result);
return result;
}
public void processDispatch(MessageDispatch messageDispatch){
}
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable {
ActiveMQDestination destination = messageDispatchNotification.getDestination();
switch(destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.processDispatchNotification(messageDispatchNotification);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.processDispatchNotification(messageDispatchNotification);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.processDispatchNotification(messageDispatchNotification);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.processDispatchNotification(messageDispatchNotification);
break;
default:
throwUnknownDestinationType(destination);
}
}
public boolean isSlaveBroker(){
return brokerService.isSlave();
}
}

View File

@ -22,6 +22,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.filter.MessageEvaluationContext;
/**
@ -87,4 +88,15 @@ public interface Subscription {
*/
void gc();
/**
* Used by a Slave Broker to update dispatch infomation
* @param mdn
*/
void processMessageDispatchNotification(MessageDispatchNotification mdn);
/**
* @return true if the broker is currently in slave mode
*/
boolean isSlaveBroker();
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
@ -32,8 +33,8 @@ import org.apache.activemq.thread.TaskRunnerFactory;
*/
public class TempQueueRegion extends AbstractRegion {
public TempQueueRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
super(destinationStatistics, memoryManager, taskRunnerFactory, null);
public TempQueueRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, null);
setAutoCreateDestinations(false);
}
@ -55,9 +56,9 @@ public class TempQueueRegion extends AbstractRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
if( info.isBrowser() ) {
return new QueueBrowserSubscription(context, info);
return new QueueBrowserSubscription(broker,context, info);
} else {
return new QueueSubscription(context, info);
return new QueueSubscription(broker,context, info);
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
@ -31,8 +32,8 @@ import org.apache.activemq.thread.TaskRunnerFactory;
*/
public class TempTopicRegion extends AbstractRegion {
public TempTopicRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
super(destinationStatistics, memoryManager, taskRunnerFactory, null);
public TempTopicRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, null);
setAutoCreateDestinations(false);
}
@ -56,7 +57,7 @@ public class TempTopicRegion extends AbstractRegion {
if( info.isDurable() ) {
throw new JMSException("A durable subscription cannot be created for a temporary topic.");
} else {
return new TopicSubscription(context, info, this.memoryManager);
return new TopicSubscription(broker,context, info, this.memoryManager);
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@ -50,9 +51,9 @@ public class TopicRegion extends AbstractRegion {
protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap();
private final PolicyMap policyMap;
public TopicRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
public TopicRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter persistenceAdapter, PolicyMap policyMap) {
super(destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
this.policyMap = policyMap;
}
@ -168,7 +169,7 @@ public class TopicRegion extends AbstractRegion {
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
if (sub == null) {
sub = new DurableTopicSubscription(context, info);
sub = new DurableTopicSubscription(broker,context, info);
durableSubscriptions.put(key, sub);
}
else {
@ -177,14 +178,14 @@ public class TopicRegion extends AbstractRegion {
return sub;
}
else {
return new TopicSubscription(context, info, memoryManager);
return new TopicSubscription(broker,context, info, memoryManager);
}
}
public Subscription createDurableSubscription(SubscriptionInfo info) throws JMSException {
SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubcriptionName());
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
sub = new DurableTopicSubscription(info);
sub = new DurableTopicSubscription(broker,info);
durableSubscriptions.put(key, sub);
return sub;
}

View File

@ -23,6 +23,7 @@ import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -30,6 +31,7 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.transaction.Synchronization;
@ -41,20 +43,36 @@ public class TopicSubscription extends AbstractSubscription {
protected int dispatched=0;
protected int delivered=0;
public TopicSubscription(ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException {
super(context, info);
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException {
super(broker,context, info);
this.usageManager=usageManager;
}
public void add(MessageReference node) throws InterruptedException, IOException {
node.incrementReferenceCount();
if( !isFull() ) {
if( !isFull() && !isSlaveBroker() ) {
dispatch(node);
} else {
synchronized(matched){
matched.addLast(node);
}
}
}
public void processMessageDispatchNotification(MessageDispatchNotification mdn){
synchronized(matched){
for (Iterator i = matched.iterator(); i.hasNext();){
MessageReference node = (MessageReference)i.next();
if (node.getMessageId().equals(mdn.getMessageId())){
i.remove();
dispatched++;
node.decrementReferenceCount();
break;
}
}
}
}
public void acknowledge(final ConnectionContext context, final MessageAck ack) throws Throwable {
// Handle the standard acknowledgment case.

View File

@ -60,6 +60,10 @@ abstract public class BaseCommand implements Command {
public boolean isMessageAck() {
return false;
}
public boolean isMessageDispatchNotification(){
return false;
}
/**
* @openwire:property version=1

View File

@ -42,6 +42,7 @@ public interface Command extends DataStructure {
boolean isWireFormatInfo();
boolean isMessage();
boolean isMessageAck();
boolean isMessageDispatchNotification();
Response visit( CommandVisitor visitor) throws Throwable;
}

View File

@ -110,6 +110,14 @@ public interface CommandTypes {
byte BOOLEAN_TYPE = 78;
byte BYTE_ARRAY_TYPE = 79;
///////////////////////////////////////////////////
//
// Broker to Broker command objects
//
///////////////////////////////////////////////////
byte MESSAGE_DISPATCH_NOTIFICATION = 90;
///////////////////////////////////////////////////
//
// Data structures contained in the command objects.
@ -129,6 +137,8 @@ public interface CommandTypes {
byte CONSUMER_ID = 122;
byte PRODUCER_ID = 123;
byte BROKER_ID = 124;
}

View File

@ -76,4 +76,8 @@ public class KeepAliveInfo implements Command {
return false;
}
public boolean isMessageDispatchNotification(){
return false;
}
}

View File

@ -0,0 +1,90 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.command;
import org.apache.activemq.state.CommandVisitor;
/**
*
* @openwire:marshaller code="90"
* @version $Revision$
*/
public class MessageDispatchNotification extends BaseCommand{
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.MESSAGE_DISPATCH_NOTIFICATION;
protected ConsumerId consumerId;
protected ActiveMQDestination destination;
protected MessageId messageId;
protected long deliverySequenceId;
public byte getDataStructureType(){
return DATA_STRUCTURE_TYPE;
}
public boolean isMessageDispatchNotification(){
return true;
}
/**
* @openwire:property version=1 cache=true
*/
public ConsumerId getConsumerId(){
return consumerId;
}
public void setConsumerId(ConsumerId consumerId){
this.consumerId=consumerId;
}
/**
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination(){
return destination;
}
public void setDestination(ActiveMQDestination destination){
this.destination=destination;
}
/**
* @openwire:property version=1
*/
public long getDeliverySequenceId(){
return deliverySequenceId;
}
public void setDeliverySequenceId(long deliverySequenceId){
this.deliverySequenceId=deliverySequenceId;
}
public Response visit(CommandVisitor visitor) throws Throwable {
return visitor.processMessageDispatchNotification( this );
}
/**
* @openwire:property version=1
*/
public MessageId getMessageId(){
return messageId;
}
public void setMessageId(MessageId messageId){
this.messageId=messageId;
}
}

View File

@ -169,4 +169,8 @@ public class WireFormatInfo implements Command {
return false;
}
public boolean isMessageDispatchNotification(){
return false;
}
}

View File

@ -58,8 +58,8 @@ public class MarshallerFactory {
add(new ConnectionErrorMarshaller());
add(new ActiveMQObjectMessageMarshaller());
add(new ConsumerInfoMarshaller());
add(new ConnectionIdMarshaller());
add(new ActiveMQTempTopicMarshaller());
add(new ConnectionIdMarshaller());
add(new DiscoveryEventMarshaller());
add(new ConnectionInfoMarshaller());
add(new KeepAliveInfoMarshaller());
@ -74,6 +74,7 @@ public class MarshallerFactory {
add(new ProducerInfoMarshaller());
add(new SubscriptionInfoMarshaller());
add(new ActiveMQMapMessageMarshaller());
add(new MessageDispatchNotificationMarshaller());
add(new SessionInfoMarshaller());
add(new ActiveMQMessageMarshaller());
add(new TransactionInfoMarshaller());

View File

@ -0,0 +1,108 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v1;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.apache.activemq.command.*;
/**
* Marshalling code for Open Wire Format for MessageDispatchNotification
*
*
* NOTE!: This file is auto generated - do not modify!
* if you need to make a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision$
*/
public class MessageDispatchNotificationMarshaller extends BaseCommandMarshaller {
/**
* Return the type of Data Structure we marshal
* @return short representation of the type data structure
*/
public byte getDataStructureType() {
return MessageDispatchNotification.DATA_STRUCTURE_TYPE;
}
/**
* @return a new object instance
*/
public DataStructure createObject() {
return new MessageDispatchNotification();
}
/**
* Un-marshal an object instance from the data input stream
*
* @param o the object to un-marshal
* @param dataIn the data input stream to build the object from
* @throws IOException
*/
public void unmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream bs) throws IOException {
super.unmarshal(wireFormat, o, dataIn, bs);
MessageDispatchNotification info = (MessageDispatchNotification)o;
info.setConsumerId((org.apache.activemq.command.ConsumerId) unmarsalCachedObject(wireFormat, dataIn, bs));
info.setDestination((org.apache.activemq.command.ActiveMQDestination) unmarsalCachedObject(wireFormat, dataIn, bs));
info.setDeliverySequenceId(unmarshalLong(wireFormat, dataIn, bs));
info.setMessageId((org.apache.activemq.command.MessageId) unmarsalNestedObject(wireFormat, dataIn, bs));
}
/**
* Write the booleans that this object uses to a BooleanStream
*/
public int marshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {
MessageDispatchNotification info = (MessageDispatchNotification)o;
int rc = super.marshal1(wireFormat, o, bs);
rc += marshal1CachedObject(wireFormat, info.getConsumerId(), bs);
rc += marshal1CachedObject(wireFormat, info.getDestination(), bs);
rc+=marshal1Long(wireFormat, info.getDeliverySequenceId(), bs);
rc += marshal1NestedObject(wireFormat, info.getMessageId(), bs);
return rc+0;
}
/**
* Write a object instance to data output stream
*
* @param o the instance to be marshaled
* @param dataOut the output stream
* @throws IOException thrown if an error occurs
*/
public void marshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException {
super.marshal2(wireFormat, o, dataOut, bs);
MessageDispatchNotification info = (MessageDispatchNotification)o;
marshal2CachedObject(wireFormat, info.getConsumerId(), dataOut, bs);
marshal2CachedObject(wireFormat, info.getDestination(), dataOut, bs);
marshal2Long(wireFormat, info.getDeliverySequenceId(), dataOut, bs);
marshal2NestedObject(wireFormat, info.getMessageId(), dataOut, bs);
}
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
@ -70,6 +71,7 @@ public interface CommandVisitor {
Response processRecoverTransactions(TransactionInfo info) throws Throwable;
Response processForgetTransaction(TransactionInfo info) throws Throwable;
Response processEndTransaction(TransactionInfo info) throws Throwable;
Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Throwable;
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
@ -272,6 +273,10 @@ public class ConnectionStateTracker implements CommandVisitor {
public Response processFlush(FlushCommand command) throws Throwable {
return null;
}
public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Throwable{
return null;
}
public boolean isRestoreConsumers() {
return restoreConsumers;

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker;
@ -69,6 +70,8 @@ public class FailoverTransport implements CompositeTransport {
private long maxReconnectDelay = 1000 * 30;
private long backOffMultiplier = 2;
private boolean useExponentialBackOff = true;
private boolean randomize = true;
private boolean initialized;
private int maxReconnectAttempts;
private int connectFailures;
private long reconnectDelay = initialReconnectDelay;
@ -79,6 +82,20 @@ public class FailoverTransport implements CompositeTransport {
if (command.isResponse()) {
requestMap.remove(new Short(((Response) command).getCorrelationId()));
}
if (!initialized){
if (command.isBrokerInfo()){
BrokerInfo info = (BrokerInfo)command;
BrokerInfo[] peers = info.getPeerBrokerInfos();
if (peers!= null){
for (int i =0; i < peers.length;i++){
String brokerString = peers[i].getBrokerURL();
add(brokerString);
}
}
initialized = true;
}
}
transportListener.onCommand(command);
}
@ -173,6 +190,7 @@ public class FailoverTransport implements CompositeTransport {
synchronized (reconnectMutex) {
log.debug("Transport failed, starting up reconnect task", e);
if (connectedTransport != null) {
initialized = false;
ServiceSupport.dispose(connectedTransport);
connectedTransport = null;
connectedTransportURI = null;
@ -256,6 +274,20 @@ public class FailoverTransport implements CompositeTransport {
this.maxReconnectAttempts = maxReconnectAttempts;
}
/**
* @return Returns the randomize.
*/
public boolean isRandomize(){
return randomize;
}
/**
* @param randomize The randomize to set.
*/
public void setRandomize(boolean randomize){
this.randomize=randomize;
}
public void oneway(Command command) throws IOException {
Exception error = null;
try {
@ -335,6 +367,19 @@ public class FailoverTransport implements CompositeTransport {
}
reconnect();
}
public void add(String u){
try {
URI uri = new URI(u);
if (!uris.contains(uri))
uris.add(uri);
reconnect();
}catch(Exception e){
log.error("Failed to parse URI: " + u);
}
}
public void reconnect() {
log.debug("Waking up reconnect task");
@ -345,17 +390,18 @@ public class FailoverTransport implements CompositeTransport {
}
}
private ArrayList getConnectList() {
ArrayList l = new ArrayList(uris);
// Randomly, reorder the list by random swapping
Random r = new Random();
r.setSeed(System.currentTimeMillis());
for (int i = 0; i < l.size(); i++) {
int p = r.nextInt(l.size());
Object t = l.get(p);
l.set(p, l.get(i));
l.set(i, t);
private ArrayList getConnectList(){
ArrayList l=new ArrayList(uris);
if (randomize){
// Randomly, reorder the list by random swapping
Random r=new Random();
r.setSeed(System.currentTimeMillis());
for (int i=0;i<l.size();i++){
int p=r.nextInt(l.size());
Object t=l.get(p);
l.set(p,l.get(i));
l.set(i,t);
}
}
return l;
}

View File

@ -0,0 +1,97 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.net.URI;
import javax.jms.*;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.io.ClassPathResource;
public class FTBrokerTest extends TestCase {
protected static final int MESSAGE_COUNT = 10;
protected BrokerService master;
protected BrokerService slave;
protected Connection connection;
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false";
//protected String uriString = "tcp://localhost:62001";
protected void setUp() throws Exception {
BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/master.xml"));
brokerFactory.afterPropertiesSet();
master = brokerFactory.getBroker();
brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/slave.xml"));
brokerFactory.afterPropertiesSet();
slave = brokerFactory.getBroker();
//uriString = "failover://(" + master.getVmConnectorURI() + "," + slave.getVmConnectorURI() + ")?randomize=false";
//uriString = "failover://(" + master.getVmConnectorURI() + "," + slave.getVmConnectorURI() + ")";
System.out.println("URI = " + uriString);
URI uri = new URI(uriString);
ConnectionFactory fac = new ActiveMQConnectionFactory(uri);
connection = fac.createConnection();
master.start();
slave.start();
//wait for thing to connect
Thread.sleep(1000);
connection.start();
super.setUp();
}
protected void tearDown() throws Exception {
try {
connection.close();
slave.stop();
master.stop();
}catch(Throwable e){
e.printStackTrace();
}
super.tearDown();
}
public void testFTBroker() throws Exception{
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getClass().toString());
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++){
Message msg = session.createTextMessage("test: " + i);
producer.send(msg);
}
master.stop();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++){
System.out.println("GOT MSG: " + consumer.receive(1000));
}
}
}

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2005-2006 The Apache Software Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans xmlns="http://activemq.org/config/1.0">
<broker brokerName="master" persistent="false" useJmx="false">
<transportConnectors>
<transportConnector uri="tcp://localhost:62001"/>
</transportConnectors>
<persistenceAdapter>
<memoryPersistenceAdapter/>
</persistenceAdapter>
</broker>
</beans>

View File

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2005-2006 The Apache Software Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans xmlns="http://activemq.org/config/1.0">
<broker brokerName="slave" persistent="false" useJmx="false" masterConnectorURI="tcp://localhost:62001">
<transportConnectors>
<transportConnector uri="tcp://localhost:62002"/>
</transportConnectors>
<persistenceAdapter>
<memoryPersistenceAdapter/>
</persistenceAdapter>
</broker>
</beans>