git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@660977 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-05-28 15:19:30 +00:00
parent ec80f286d2
commit 4ee029ef4e
11 changed files with 72 additions and 35 deletions

View File

@ -333,6 +333,27 @@ public class AdvisoryBroker extends BrokerFilter {
LOG.warn("Failed to fire message is full advisory");
}
}
public void nowMasterBroker() {
super.nowMasterBroker();
try {
ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic();
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty("brokerName", getBrokerName());
String[] uris = getBrokerService().getTransportConnectorURIs();
String uri = getBrokerService().getVmConnectorURI().toString();
if (uris != null && uris.length > 0) {
uri = uris[0];
}
advisoryMessage.setStringProperty("brokerURL", getBrokerName());
advisoryMessage.setStringProperty("brokerURI", uri);
ConnectionContext context = new ConnectionContext();
context.setBroker(getBrokerService().getBroker());
fireAdvisory(context, topic,advisoryMessage);
} catch (Exception e) {
LOG.warn("Failed to fire message master broker advisory");
}
}
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
fireAdvisory(context, topic, command, null);

View File

@ -45,6 +45,7 @@ public final class AdvisorySupport {
public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
public static final String AGENT_TOPIC = "ActiveMQ.Agent";
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
public static final String MSG_PROPERTY_ORIGIN_BROKER_ID="originBrokerId";
@ -137,6 +138,10 @@ public final class AdvisorySupport {
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
}
public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
String name = FULL_TOPIC_PREFIX
+ destination.getDestinationTypeAsString() + "."
@ -272,6 +277,20 @@ public final class AdvisorySupport {
}
}
public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
if (destination.isComposite()) {
ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
for (int i = 0; i < compositeDestinations.length; i++) {
if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
return true;
}
}
return false;
} else {
return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
}
}
public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
if (destination.isComposite()) {
ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();

View File

@ -365,6 +365,12 @@ public interface Broker extends Region, Service {
* @param usage
*/
void isFull(ConnectionContext context,Destination destination,Usage usage);
/**
* called when the broker becomes the master in a master/slave
* configuration
*/
void nowMasterBroker();
}

View File

@ -290,4 +290,8 @@ public class BrokerFilter implements Broker {
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
next.slowConsumer(context, destination,subs);
}
public void nowMasterBroker() {
next.nowMasterBroker();
}
}

View File

@ -409,6 +409,7 @@ public class BrokerService implements Service {
LOG.warn("Master Failed - starting all connectors");
try {
startAllConnectors();
broker.nowMasterBroker();
} catch (Exception e) {
LOG.error("Failed to startAllConnectors");
}

View File

@ -221,7 +221,7 @@ public class EmptyBroker implements Broker {
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
}
public Response messagePull(ConnectionContext context, MessagePull pull) {
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
return null;
}
@ -275,4 +275,7 @@ public class EmptyBroker implements Broker {
public void slowConsumer(ConnectionContext context,Destination destination, Subscription subs) {
}
public void nowMasterBroker() {
}
}

View File

@ -292,4 +292,8 @@ public class ErrorBroker implements Broker {
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
throw new BrokerStoppedException(this.message);
}
public void nowMasterBroker() {
throw new BrokerStoppedException(this.message);
}
}

View File

@ -302,5 +302,9 @@ public class MutableBrokerFilter implements Broker {
public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) {
getNext().slowConsumer(context, dest,subs);
}
public void nowMasterBroker() {
getNext().nowMasterBroker();
}
}

View File

@ -25,15 +25,14 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.EmptyBroker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyMap;
@ -52,13 +51,11 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
@ -71,7 +68,7 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision$
*/
public class RegionBroker implements Broker {
public class RegionBroker extends EmptyBroker {
private static final Log LOG = LogFactory.getLog(RegionBroker.class);
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
@ -324,12 +321,6 @@ public class RegionBroker implements Broker {
return rc;
}
public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
}
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
}
public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
ActiveMQDestination destination = info.getDestination();
@ -619,10 +610,6 @@ public class RegionBroker implements Broker {
return destinationFactory.getDestinations();
}
public boolean isFaultTolerantConfiguration() {
return false;
}
protected void doStop(ServiceStopper ss) {
ss.stop(queueRegion);
ss.stop(topicRegion);
@ -680,24 +667,6 @@ public class RegionBroker implements Broker {
getRoot().sendToDeadLetterQueue(context, node);
}
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
}
public void isFull(ConnectionContext context,Destination destination, Usage usage) {
}
public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
}
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
}
public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
}
public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) {
}
public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference node){
try{

View File

@ -921,7 +921,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
protected void lock() throws IOException, InterruptedException {
protected void lock() throws Exception {
boolean logged = false;
boolean aquiredLock = false;
do {
@ -937,6 +937,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
if (aquiredLock && logged) {
LOG.info("Aquired lock for AMQ Store" + getDirectory());
if (brokerService != null) {
brokerService.getBroker().nowMasterBroker();
}
}
} while (!aquiredLock && !disableLocking);

View File

@ -175,6 +175,9 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
} else {
service.start();
if (brokerService != null) {
brokerService.getBroker().nowMasterBroker();
}
}
}