Minor refactor so that we can configure the MasterConnector using XML or dependency injection easier. Also added a test case showing how to use a new <masterConnector> element to configure the userName & password of the slave. For background on this issue see: http://www.nabble.com/Master-Slave-with-authentication-tf2094845.html

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@432517 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-08-18 08:30:40 +00:00
parent 8f112fe208
commit ee2462da41
5 changed files with 258 additions and 121 deletions

View File

@ -113,12 +113,13 @@ public class BrokerService implements Service, Serializable {
private List proxyConnectors = new CopyOnWriteArrayList();
private List registeredMBeanNames = new CopyOnWriteArrayList();
private List jmsConnectors = new CopyOnWriteArrayList();
private Service[] services;
private MasterConnector masterConnector;
private String masterConnectorURI;
private transient Thread shutdownHook;
private String[] transportConnectorURIs;
private String[] networkConnectorURIs;
private String[] proxyConnectorURIs;
private String masterConnectorURI;
private JmsConnector[] jmsBridgeConnectors; //these are Jms to Jms bridges to other jms messaging systems
private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true;
@ -280,24 +281,6 @@ public class BrokerService implements Service, Serializable {
return null;
}
public void initializeMasterConnector(URI remoteURI) throws Exception {
if (masterConnector != null){
throw new IllegalStateException("Can only be the Slave to one Master");
}
URI localURI = getVmConnectorURI();
TransportConnector connector = null;
if (!transportConnectors.isEmpty()){
connector = (TransportConnector)transportConnectors.get(0);
}
masterConnector = new MasterConnector(this,connector);
masterConnector.setLocalURI(localURI);
masterConnector.setRemoteURI(remoteURI);
if (isUseJmx()) {
registerFTConnectorMBean(masterConnector);
}
}
/**
* @return Returns the masterConnectorURI.
*/
@ -370,15 +353,13 @@ public class BrokerService implements Service, Serializable {
}
getBroker().start();
if (masterConnectorURI!=null){
initializeMasterConnector(new URI(masterConnectorURI));
if (masterConnector!=null){
masterConnector.start();
}
}
startAllConnectors();
if (isUseJmx() && masterConnector != null) {
registerFTConnectorMBean(masterConnector);
}
brokerId = broker.getBrokerId();
log.info("ActiveMQ JMS Message Broker (" + getBrokerName()+", "+brokerId+") started");
}
@ -388,6 +369,7 @@ public class BrokerService implements Service, Serializable {
}
}
public void stop() throws Exception {
if (! started.compareAndSet(true, false)) {
return;
@ -398,10 +380,13 @@ public class BrokerService implements Service, Serializable {
removeShutdownHook();
ServiceStopper stopper = new ServiceStopper();
if (masterConnector != null){
masterConnector.stop();
if (services != null) {
for (int i = 0; i < services.length; i++) {
Service service = services[i];
stopper.stop(service);
}
}
for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = (NetworkConnector) iter.next();
@ -417,8 +402,8 @@ public class BrokerService implements Service, Serializable {
JmsConnector connector = (JmsConnector) iter.next();
stopper.stop(connector);
}
for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = (TransportConnector) iter.next();
stopper.stop(connector);
}
@ -658,6 +643,34 @@ public class BrokerService implements Service, Serializable {
this.jmsBridgeConnectors=jmsConnectors;
}
public Service[] getServices() {
return services;
}
/**
* Sets the services associated with this broker such as a {@link MasterConnector}
*/
public void setServices(Service[] services) {
this.services = services;
}
/**
* Adds a new service so that it will be started as part of the broker lifecycle
*/
public void addService(Service service) {
if (services == null) {
services = new Service[] { service };
}
else {
int length = services.length;
Service[] temp = new Service[length + 1];
System.arraycopy(services, 1, temp, 1, length);
temp[length] = service;
services = temp;
}
}
public boolean isUseLoggingForShutdownErrors() {
return useLoggingForShutdownErrors;
}
@ -903,7 +916,14 @@ public class BrokerService implements Service, Serializable {
addJmsConnector(jmsBridgeConnectors[i]);
}
}
if (masterConnectorURI != null) {
if (masterConnector != null) {
throw new IllegalStateException("Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
}
else {
addService(new MasterConnector(masterConnectorURI));
}
}
}
protected void registerConnectorMBean(TransportConnector connector, ObjectName objectName) throws IOException, URISyntaxException {
@ -1269,6 +1289,14 @@ public class BrokerService implements Service, Serializable {
JmsConnector connector = (JmsConnector) iter.next();
connector.start();
}
if (services != null) {
for (int i = 0; i < services.length; i++) {
Service service = services[i];
configureService(service);
service.start();
}
}
}
}
@ -1304,5 +1332,8 @@ public class BrokerService implements Service, Serializable {
BrokerServiceAware serviceAware = (BrokerServiceAware) service;
serviceAware.setBrokerService(this);
}
if (service instanceof MasterConnector) {
masterConnector = (MasterConnector) service;
}
}
}

View File

@ -19,8 +19,12 @@ package org.apache.activemq.broker.ft;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
@ -43,80 +47,103 @@ import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* Used by a Slave Broker to Connect to the Master
* Connects a Slave Broker to a Master when using <a
* href="http://incubator.apache.org/activemq/masterslave.html">Master Slave</a>
* for High Availability of messages.
*
* @org.apache.xbean.XBean
*
* @version $Revision$
*/
public class MasterConnector implements Service{
public class MasterConnector implements Service, BrokerServiceAware {
private static final Log log=LogFactory.getLog(MasterConnector.class);
private static final Log log = LogFactory.getLog(MasterConnector.class);
private BrokerService broker;
private URI remoteURI;
private URI localURI;
private Transport localBroker;
private Transport remoteBroker;
private TransportConnector connector;
private AtomicBoolean masterActive=new AtomicBoolean(false);
private AtomicBoolean started=new AtomicBoolean(false);
private final IdGenerator idGenerator=new IdGenerator();
private AtomicBoolean masterActive = new AtomicBoolean(false);
private AtomicBoolean started = new AtomicBoolean(false);
private final IdGenerator idGenerator = new IdGenerator();
private String userName;
private String password;
private ConnectionInfo connectionInfo;
private SessionInfo sessionInfo;
private ProducerInfo producerInfo;
ConnectionInfo connectionInfo;
SessionInfo sessionInfo;
ProducerInfo producerInfo;
public MasterConnector(BrokerService broker,TransportConnector connector){
this.broker=broker;
this.connector=connector;
public MasterConnector() {
}
public boolean isSlave(){
public MasterConnector(String remoteUri) throws URISyntaxException {
remoteURI = new URI(remoteUri);
}
public void setBrokerService(BrokerService broker) {
this.broker = broker;
if (localURI == null) {
localURI = broker.getVmConnectorURI();
}
if (connector == null) {
List transportConnectors = broker.getTransportConnectors();
if (!transportConnectors.isEmpty()) {
connector = (TransportConnector) transportConnectors.get(0);
}
}
}
public boolean isSlave() {
return masterActive.get();
}
public void start() throws Exception{
if( !started.compareAndSet(false, true) ) {
public void start() throws Exception {
if (!started.compareAndSet(false, true)) {
return;
}
localBroker=TransportFactory.connect(localURI);
remoteBroker=TransportFactory.connect(remoteURI);
log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established.");
if (remoteURI == null) {
throw new IllegalArgumentException("You must specify a remoteURI");
}
localBroker = TransportFactory.connect(localURI);
remoteBroker = TransportFactory.connect(remoteURI);
log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established.");
localBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command){
localBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Command command) {
}
public void onException(IOException error){
if( started.get() ) {
public void onException(IOException error) {
if (started.get()) {
serviceLocalException(error);
}
}
});
remoteBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command){
if( started.get() ) {
remoteBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Command command) {
if (started.get()) {
serviceRemoteCommand(command);
}
}
public void onException(IOException error){
if( started.get() ) {
public void onException(IOException error) {
if (started.get()) {
serviceRemoteException(error);
}
}
});
masterActive.set(true);
Thread thead=new Thread(){
public void run(){
try{
Thread thead = new Thread() {
public void run() {
try {
localBroker.start();
remoteBroker.start();
startBridge();
}catch(Exception e){
}
catch (Exception e) {
masterActive.set(false);
log.error("Failed to start network bridge: "+e,e);
log.error("Failed to start network bridge: " + e, e);
}
}
};
@ -124,105 +151,109 @@ public class MasterConnector implements Service{
}
protected void startBridge() throws Exception{
connectionInfo=new ConnectionInfo();
protected void startBridge() throws Exception {
connectionInfo = new ConnectionInfo();
connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
connectionInfo.setClientId(idGenerator.generateId());
connectionInfo.setUserName(userName);
connectionInfo.setPassword(password);
localBroker.oneway(connectionInfo);
ConnectionInfo remoteInfo=new ConnectionInfo();
ConnectionInfo remoteInfo = new ConnectionInfo();
connectionInfo.copy(remoteInfo);
remoteInfo.setBrokerMasterConnector(true);
remoteBroker.oneway(connectionInfo);
sessionInfo=new SessionInfo(connectionInfo,1);
sessionInfo = new SessionInfo(connectionInfo, 1);
localBroker.oneway(sessionInfo);
remoteBroker.oneway(sessionInfo);
producerInfo=new ProducerInfo(sessionInfo,1);
producerInfo = new ProducerInfo(sessionInfo, 1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
BrokerInfo brokerInfo=null;
if (connector!=null){
BrokerInfo brokerInfo = null;
if (connector != null) {
brokerInfo=connector.getBrokerInfo();
}else{
brokerInfo=new BrokerInfo();
brokerInfo = connector.getBrokerInfo();
}
else {
brokerInfo = new BrokerInfo();
}
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
brokerInfo.setSlaveBroker(true);
remoteBroker.oneway(brokerInfo);
log.info("Slave connection between "+localBroker+" and "+remoteBroker+" has been established.");
log.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established.");
}
public void stop() throws Exception{
if( !started.compareAndSet(true, false) ) {
public void stop() throws Exception {
if (!started.compareAndSet(true, false)) {
return;
}
masterActive.set(false);
try{
// if (connectionInfo!=null){
// localBroker.request(connectionInfo.createRemoveCommand());
// }
// localBroker.setTransportListener(null);
// remoteBroker.setTransportListener(null);
try {
// if (connectionInfo!=null){
// localBroker.request(connectionInfo.createRemoveCommand());
// }
// localBroker.setTransportListener(null);
// remoteBroker.setTransportListener(null);
remoteBroker.oneway(new ShutdownInfo());
localBroker.oneway(new ShutdownInfo());
}catch(IOException e){
log.debug("Caught exception stopping",e);
}finally{
ServiceStopper ss=new ServiceStopper();
}
catch (IOException e) {
log.debug("Caught exception stopping", e);
}
finally {
ServiceStopper ss = new ServiceStopper();
ss.stop(localBroker);
ss.stop(remoteBroker);
ss.throwFirstException();
}
}
protected void serviceRemoteException(IOException error){
log.error("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
protected void serviceRemoteException(IOException error) {
log.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
shutDown();
}
protected void serviceRemoteCommand(Command command){
try{
if (command.isMessageDispatch()){
MessageDispatch md=(MessageDispatch)command;
command=md.getMessage();
protected void serviceRemoteCommand(Command command) {
try {
if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command;
command = md.getMessage();
}
if (command.getDataStructureType()==CommandTypes.SHUTDOWN_INFO){
if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) {
log.warn("The Master has shutdown");
shutDown();
}else{
}
else {
boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();
localBroker.oneway(command);
if (responseRequired){
Response response=new Response();
if (responseRequired) {
Response response = new Response();
response.setCorrelationId(commandId);
remoteBroker.oneway(response);
}
}
}catch(IOException e){
}
catch (IOException e) {
serviceRemoteException(e);
}
}
protected void serviceLocalException(Throwable error){
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
protected void serviceLocalException(Throwable error) {
log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
ServiceSupport.dispose(this);
}
/**
* @return Returns the localURI.
*/
public URI getLocalURI(){
public URI getLocalURI() {
return localURI;
}
@ -230,14 +261,14 @@ public class MasterConnector implements Service{
* @param localURI
* The localURI to set.
*/
public void setLocalURI(URI localURI){
this.localURI=localURI;
public void setLocalURI(URI localURI) {
this.localURI = localURI;
}
/**
* @return Returns the remoteURI.
*/
public URI getRemoteURI(){
public URI getRemoteURI() {
return remoteURI;
}
@ -245,43 +276,44 @@ public class MasterConnector implements Service{
* @param remoteURI
* The remoteURI to set.
*/
public void setRemoteURI(URI remoteURI){
this.remoteURI=remoteURI;
public void setRemoteURI(URI remoteURI) {
this.remoteURI = remoteURI;
}
/**
* @return Returns the password.
*/
public String getPassword(){
public String getPassword() {
return password;
}
/**
* @param password The password to set.
* @param password
* The password to set.
*/
public void setPassword(String password){
this.password=password;
public void setPassword(String password) {
this.password = password;
}
/**
* @return Returns the userName.
*/
public String getUserName(){
public String getUserName() {
return userName;
}
/**
* @param userName The userName to set.
* @param userName
* The userName to set.
*/
public void setUserName(String userName){
this.userName=userName;
public void setUserName(String userName) {
this.userName = userName;
}
private void shutDown(){
private void shutDown() {
masterActive.set(false);
broker.masterFailed();
ServiceSupport.dispose(this);
}
}

View File

@ -40,10 +40,10 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT
protected void setUp() throws Exception{
failureCount = super.messageCount/2;
super.topic = isTopic();
BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/master.xml"));
BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource(getMasterXml()));
brokerFactory.afterPropertiesSet();
master=brokerFactory.getBroker();
brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/slave.xml"));
brokerFactory=new BrokerFactoryBean(new ClassPathResource(getSlaveXml()));
brokerFactory.afterPropertiesSet();
slave=brokerFactory.getBroker();
master.start();
@ -54,6 +54,14 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT
}
protected String getSlaveXml() {
return "org/apache/activemq/broker/ft/slave.xml";
}
protected String getMasterXml() {
return "org/apache/activemq/broker/ft/master.xml";
}
protected void tearDown() throws Exception{
super.tearDown();
slave.stop();

View File

@ -0,0 +1,30 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
/**
* Lets test using a <masterConnector/> element instead of the old masterConnectorURI attribute
*
* @version $Revision$
*/
public class QueueMasterSlaveUsingMasterConnectorElementTest extends QueueMasterSlaveTest {
protected String getSlaveXml() {
return "org/apache/activemq/broker/ft/slave2.xml";
}
}

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker brokerName="slave" persistent="false" useJmx="false" deleteAllMessagesOnStartup="true" xmlns="http://activemq.org/config/1.0">
<transportConnectors>
<transportConnector uri="tcp://localhost:62002"/>
</transportConnectors>
<services>
<masterConnector remoteURI= "tcp://localhost:62001" userName="James" password="Cheese"/>
</services>
<persistenceAdapter>
<kahaPersistenceAdapter dir = "${basedir}/target/activemq-data/slave"/>
</persistenceAdapter>
</broker>
</beans>