From 9e1dad4cc2b67a543790b490ad33c90e7b64d642 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 8 Aug 2008 18:48:06 +0000 Subject: [PATCH] more refinements for Group git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@684047 13f79535-47bb-0310-9956-ffa450edef68 --- .../group/DefaultMapChangedListener.java | 6 +- .../org/apache/activemq/group/EntryKey.java | 70 +- .../apache/activemq/group/EntryMessage.java | 53 + .../org/apache/activemq/group/EntryValue.java | 10 +- .../java/org/apache/activemq/group/Group.java | 1742 +++++++++++++++++ .../org/apache/activemq/group/GroupMap.java | 1216 ------------ .../activemq/group/GroupMessageListener.java | 34 + .../org/apache/activemq/group/Member.java | 50 +- .../activemq/group/MemberChangedListener.java | 2 +- .../org/apache/activemq/group/package.html | 2 +- .../apache/activemq/group/GroupMapTest.java | 146 +- ...apMemberTest.java => GroupMemberTest.java} | 80 +- .../activemq/group/GroupMessageTest.java | 256 +++ 13 files changed, 2373 insertions(+), 1294 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/group/Group.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java rename activemq-core/src/test/java/org/apache/activemq/group/{GroupMapMemberTest.java => GroupMemberTest.java} (57%) create mode 100644 activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java b/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java index 7e6f0c2b4c..eecfa0c8b7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java +++ b/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java @@ -22,12 +22,12 @@ package org.apache.activemq.group; */ public class DefaultMapChangedListener implements MapChangedListener{ - public void mapInsert(Member owner, Object key, Object value) { + public void mapInsert(Member owner, Object key, Object value) { } - public void mapRemove(Member owner, Object key, Object value,boolean expired) { + public void mapRemove(Member owner, Object key, Object value,boolean expired) { } - public void mapUpdate(Member owner, Object Key, Object oldValue,Object newValue) { + public void mapUpdate(Member owner, Object key, Object oldValue,Object newValue) { } } diff --git a/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java b/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java index 981c8c8a2f..996277e105 100644 --- a/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java +++ b/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java @@ -28,9 +28,11 @@ import java.io.ObjectOutput; class EntryKey implements Externalizable { private Member owner; private K key; - private boolean share; + private boolean locked; private boolean removeOnExit; + private boolean releaseLockOnExit; private long expiration; + private long lockExpiration; /** * Default constructor - for serialization @@ -53,6 +55,10 @@ class EntryKey implements Externalizable { public Member getOwner() { return this.owner; } + + public void setOwner(Member member) { + this.owner=member; + } /** * @return the key @@ -64,15 +70,15 @@ class EntryKey implements Externalizable { /** * @return the share */ - public boolean isShare() { - return this.share; + public boolean isLocked() { + return this.locked; } /** * @param share the share to set */ - public void setShare(boolean share) { - this.share = share; + public void setLocked(boolean locked) { + this.locked = locked; } /** @@ -104,6 +110,34 @@ class EntryKey implements Externalizable { this.expiration = expiration; } + /** + * @return the lockExpiration + */ + public long getLockExpiration() { + return lockExpiration; + } + + /** + * @param lockExpiration the lockExpiration to set + */ + public void setLockExpiration(long lockExpiration) { + this.lockExpiration = lockExpiration; + } + + /** + * @return the releaseLockOnExit + */ + public boolean isReleaseLockOnExit() { + return releaseLockOnExit; + } + + /** + * @param releaseLockOnExit the releaseLockOnExit to set + */ + public void setReleaseLockOnExit(boolean releaseLockOnExit) { + this.releaseLockOnExit = releaseLockOnExit; + } + void setTimeToLive(long ttl) { if (ttl > 0 ) { this.expiration=ttl+System.currentTimeMillis(); @@ -112,6 +146,14 @@ class EntryKey implements Externalizable { } } + void setLockTimeToLive(long ttl) { + if(ttl > 0) { + this.lockExpiration=ttl+System.currentTimeMillis(); + }else { + this.lockExpiration=0l; + } + } + boolean isExpired() { return isExpired(System.currentTimeMillis()); } @@ -120,6 +162,14 @@ class EntryKey implements Externalizable { return this.expiration > 0 && this.expiration < currentTime; } + boolean isLockExpired() { + return isLockExpired(System.currentTimeMillis()); + } + + boolean isLockExpired(long currentTime) { + return this.lockExpiration > 0 && this.lockExpiration < currentTime; + } + public boolean equals(Object obj) { @@ -134,21 +184,27 @@ class EntryKey implements Externalizable { public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(this.owner); out.writeObject(this.key); - out.writeBoolean(isShare()); + out.writeBoolean(isLocked()); out.writeBoolean(isRemoveOnExit()); + out.writeBoolean(isReleaseLockOnExit()); out.writeLong(getExpiration()); + out.writeLong(getLockExpiration()); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { this.owner = (Member) in.readObject(); this.key = (K) in.readObject(); - this.share = in.readBoolean(); + this.locked = in.readBoolean(); this.removeOnExit=in.readBoolean(); + this.releaseLockOnExit=in.readBoolean(); this.expiration=in.readLong(); + this.lockExpiration=in.readLong(); } public String toString() { return "key:"+this.key; } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java b/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java index 4667958702..ef569762f5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java +++ b/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java @@ -29,9 +29,12 @@ public class EntryMessage implements Externalizable{ static enum MessageType{INSERT,DELETE,SYNC}; private EntryKey key; private Object value; + private Object oldValue; private MessageType type; private boolean mapUpdate; private boolean expired; + private boolean lockExpired; + private boolean lockUpdate; /** * @return the owner @@ -58,6 +61,19 @@ public class EntryMessage implements Externalizable{ this.value = value; } + /** + * @return the oldValue + */ + public Object getOldValue() { + return this.oldValue; + } + /** + * @param oldValue the oldValue to set + */ + public void setOldValue(Object oldValue) { + this.oldValue = oldValue; + } + /** * @return the type */ @@ -97,6 +113,32 @@ public class EntryMessage implements Externalizable{ this.expired = expired; } + /** + * @return the lockExpired + */ + public boolean isLockExpired() { + return lockExpired; + } + /** + * @param lockExpired the lockExpired to set + */ + public void setLockExpired(boolean lockExpired) { + this.lockExpired = lockExpired; + } + + /** + * @return the lockUpdate + */ + public boolean isLockUpdate() { + return lockUpdate; + } + /** + * @param lockUpdate the lockUpdate to set + */ + public void setLockUpdate(boolean lockUpdate) { + this.lockUpdate = lockUpdate; + } + /** * @return if insert message */ @@ -115,13 +157,17 @@ public class EntryMessage implements Externalizable{ return this.type != null && this.type.equals(MessageType.SYNC); } + public EntryMessage copy() { EntryMessage result = new EntryMessage(); result.key=this.key; result.value=this.value; + result.oldValue=this.oldValue; result.type=this.type; result.mapUpdate=this.mapUpdate; result.expired=this.expired; + result.lockExpired=this.lockExpired; + result.lockUpdate=this.lockUpdate; return result; } @@ -131,21 +177,28 @@ public class EntryMessage implements Externalizable{ ClassNotFoundException { this.key=(EntryKey) in.readObject(); this.value=in.readObject(); + this.oldValue=in.readObject(); this.type=(MessageType) in.readObject(); this.mapUpdate=in.readBoolean(); this.expired=in.readBoolean(); + this.lockExpired=in.readBoolean(); + this.lockUpdate=in.readBoolean(); } public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(this.key); out.writeObject(this.value); + out.writeObject(this.oldValue); out.writeObject(this.type); out.writeBoolean(this.mapUpdate); out.writeBoolean(this.expired); + out.writeBoolean(this.lockExpired); + out.writeBoolean(this.lockUpdate); } public String toString() { return "EntryMessage: "+this.type + "[" + this.key + "," + this.value + "]{update=" + this.mapUpdate + "}"; } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java b/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java index 468faf0a34..791f67d360 100644 --- a/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java +++ b/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java @@ -22,20 +22,20 @@ package org.apache.activemq.group; * */ class EntryValue { - private Member owner; + private EntryKey key; private V value; - EntryValue(Member owner, V value){ - this.owner=owner; + EntryValue(EntryKey key, V value){ + this.key=key; this.value=value; } /** * @return the owner */ - public Member getOwner() { - return this.owner; + public EntryKey getKey() { + return this.key; } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/group/Group.java b/activemq-core/src/main/java/org/apache/activemq/group/Group.java new file mode 100644 index 0000000000..416c5405fd --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/group/Group.java @@ -0,0 +1,1742 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.activemq.ActiveMQMessageConsumer; +import org.apache.activemq.Service; +import org.apache.activemq.advisory.ConsumerEvent; +import org.apache.activemq.advisory.ConsumerEventSource; +import org.apache.activemq.advisory.ConsumerListener; +import org.apache.activemq.thread.SchedulerTimerTask; +import org.apache.activemq.util.IdGenerator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + *

+ * A Group is a distributed collaboration implementation that is + * used to shared state and process messages amongst a distributed group of + * other Group instances. + * Membership of a group is handled automatically using discovery. + *

+ * The underlying transport is JMS and there are some optimizations that occur + * for membership if used with ActiveMQ - but Group can be used + * with any JMS implementation. + * + *

+ * Updates to the group shared map are controlled by a coordinator. The + * coordinator is elected by the member with the lowest lexicographical id - + * based on the bully algorithm [Silberschatz et al. 1993] + *

+ * The {@link #selectCordinator(Collection members)} method may be + * overridden to implement a custom mechanism for choosing how the coordinator + * is elected for the map. + *

+ * New Group instances have their state updated by the coordinator, + * and coordinator failure is handled automatically within the group. + *

+ * All map updates are totally ordered through the coordinator, whilst read operations + * happen locally. + *

+ * A Group supports the concept of owner only updates(write locks), + * shared updates, entry expiration times and removal on owner exit - + * all of which are optional. In addition, you can grab and release locks for + * values in the map, independently of who created them. + *

+ * In addition, members of a group can broadcast messages and implement request/response + * with other Group instances. + * + *

+ * + * @param the key type + * @param the value type + * + */ +public class Group implements Map, Service { + /** + * default interval within which to detect a member failure + */ + public static final long DEFAULT_HEART_BEAT_INTERVAL = 2000; + private static final long EXPIRATION_SWEEP_INTERVAL = 500; + private static final Log LOG = LogFactory.getLog(Group.class); + private static final String STATE_PREFIX = "STATE."+Group.class.getName()+ "."; + private static final String GROUP_MESSAGE_PREFIX = "MESSAGE."+Group.class.getName()+ "."; + private static final String STATE_TYPE = "state"; + private static final String MESSAGE_TYPE = "message"; + private static final String MEMBER_ID_PROPERTY="memberId"; + protected Member local; + private final Object mapMutex = new Object(); + private Map> localMap; + private Map members = new ConcurrentHashMap(); + private Map stateRequests = new HashMap(); + private Map messageRequests = new HashMap(); + private List membershipListeners = new CopyOnWriteArrayList(); + private List mapChangedListeners = new CopyOnWriteArrayList(); + private List groupMessageListeners = new CopyOnWriteArrayList(); + + private Member coordinator; + private String groupName; + private boolean alwaysLock; + private Connection connection; + private Session stateSession; + private Session messageSession; + private Topic stateTopic; + private Topic heartBeatTopic; + private Topic inboxTopic; + private Topic messageTopic; + private Queue messageQueue; + private MessageProducer stateProducer; + private MessageProducer messageProducer; + private ConsumerEventSource consumerEvents; + private AtomicBoolean started = new AtomicBoolean(); + private SchedulerTimerTask heartBeatTask; + private SchedulerTimerTask checkMembershipTask; + private SchedulerTimerTask expirationTask; + private Timer timer; + private long heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL; + private IdGenerator idGenerator = new IdGenerator(); + private boolean removeOwnedObjectsOnExit; + private boolean releaseLockOnExit=true; + private int timeToLive; + private int lockTimeToLive; + private int minimumGroupSize = 1; + private int coordinatorWeight=0; + private final AtomicBoolean electionFinished = new AtomicBoolean(true); + private ExecutorService stateExecutor; + private ExecutorService messageExecutor; + private ThreadPoolExecutor electionExecutor; + private final Object memberMutex = new Object(); + + /** + * @param connection + * @param name + */ + public Group(Connection connection, String name) { + this(connection, "default", name); + } + + /** + * @param connection + * @param groupName + * @param name + */ + public Group(Connection connection, String groupName, String name) { + this.connection = connection; + this.local = new Member(name); + this.coordinator = this.local; + this.groupName = groupName; + } + + /** + * Set the local map implementation to be used By default its a HashMap - + * but you could use a Cache for example + * + * @param map + */ + public void setLocalMap(Map map) { + synchronized (this.mapMutex) { + this.localMap = map; + } + } + + /** + * Start membership to the group + * + * @throws Exception + * + */ + public void start() throws Exception { + if (this.started.compareAndSet(false, true)) { + synchronized (this.mapMutex) { + if (this.localMap == null) { + this.localMap = new HashMap>(); + } + } + this.connection.start(); + this.stateSession = this.connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + this.messageSession = this.connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + this.stateProducer = this.stateSession.createProducer(null); + this.stateProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + this.inboxTopic = this.stateSession.createTemporaryTopic(); + + String stateTopicName = STATE_PREFIX + this.groupName; + this.stateTopic = this.stateSession.createTopic(stateTopicName); + this.heartBeatTopic = this.stateSession.createTopic(stateTopicName + + ".heartbeat"); + + String messageDestinationName = GROUP_MESSAGE_PREFIX+ this.groupName; + this.messageTopic = this.messageSession.createTopic(messageDestinationName); + this.messageQueue = this.messageSession.createQueue(messageDestinationName); + MessageConsumer privateInbox = this.messageSession + .createConsumer(this.inboxTopic); + + + + privateInbox.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + processJMSMessage(message); + } + }); + MessageConsumer mapChangeConsumer = this.stateSession + .createConsumer(this.stateTopic); + String memberId = null; + if (mapChangeConsumer instanceof ActiveMQMessageConsumer) { + memberId = ((ActiveMQMessageConsumer) mapChangeConsumer) + .getConsumerId().toString(); + } else { + memberId = this.idGenerator.generateId(); + } + this.local.setId(memberId); + this.local.setInBoxDestination(this.inboxTopic); + this.local.setCoordinatorWeight(getCoordinatorWeight()); + mapChangeConsumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + processJMSMessage(message); + } + }); + + this.messageProducer = this.messageSession.createProducer(null); + this.messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + MessageConsumer topicMessageConsumer = this.messageSession.createConsumer(this.messageTopic); + topicMessageConsumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + processJMSMessage(message); + } + }); + + MessageConsumer queueMessageConsumer = this.messageSession.createConsumer(this.messageQueue); + queueMessageConsumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + processJMSMessage(message); + } + }); + + + MessageConsumer heartBeatConsumer = this.stateSession + .createConsumer(this.heartBeatTopic); + heartBeatConsumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + handleHeartbeats(message); + } + }); + this.consumerEvents = new ConsumerEventSource(this.connection, + this.stateTopic); + this.consumerEvents.setConsumerListener(new ConsumerListener() { + public void onConsumerEvent(ConsumerEvent event) { + handleConsumerEvents(event); + } + }); + this.consumerEvents.start(); + + this.electionExecutor = new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "Election{" + + Group.this.local + "}"); + thread.setDaemon(true); + return thread; + } + }); + + this.stateExecutor = Executors + .newSingleThreadExecutor(new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "Group State{" + + Group.this.local + "}"); + thread.setDaemon(true); + return thread; + } + }); + + this.messageExecutor = Executors + .newSingleThreadExecutor(new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "Group Messages{" + + Group.this.local + "}"); + thread.setDaemon(true); + return thread; + } + }); + sendHeartBeat(); + this.heartBeatTask = new SchedulerTimerTask(new Runnable() { + public void run() { + sendHeartBeat(); + } + }); + this.checkMembershipTask = new SchedulerTimerTask(new Runnable() { + public void run() { + checkMembership(); + } + }); + + this.expirationTask = new SchedulerTimerTask(new Runnable() { + public void run() { + expirationSweep(); + } + }); + this.timer = new Timer("Distributed heart beat", true); + this.timer.scheduleAtFixedRate(this.heartBeatTask, + getHeartBeatInterval() / 3, getHeartBeatInterval() / 2); + this.timer.scheduleAtFixedRate(this.checkMembershipTask, + getHeartBeatInterval(), getHeartBeatInterval()); + this.timer.scheduleAtFixedRate(this.expirationTask, + EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL); + // await for members to join + long timeout = this.heartBeatInterval * this.minimumGroupSize; + long deadline = System.currentTimeMillis() + timeout; + while ((this.members.size() < this.minimumGroupSize || !this.electionFinished + .get()) + && timeout > 0) { + synchronized (this.memberMutex) { + this.memberMutex.wait(timeout); + } + timeout = Math.max(deadline - System.currentTimeMillis(), 0); + } + } + } + + /** + * stop membership to the group + * + * @throws Exception + */ + public void stop() { + if (this.started.compareAndSet(true, false)) { + this.expirationTask.cancel(); + this.checkMembershipTask.cancel(); + this.heartBeatTask.cancel(); + this.expirationTask.cancel(); + this.timer.purge(); + if (this.electionExecutor != null) { + this.electionExecutor.shutdownNow(); + } + if (this.stateExecutor != null) { + this.stateExecutor.shutdownNow(); + } + if(this.messageExecutor != null) { + this.messageExecutor.shutdownNow(); + } + try { + this.consumerEvents.stop(); + this.stateSession.close(); + this.messageSession.close(); + } catch (Exception e) { + LOG.debug("Caught exception stopping", e); + } + } + } + + /** + * @return true if there is elections have finished + */ + public boolean isElectionFinished() { + return this.electionFinished.get(); + } + + /** + * @return the partitionName + */ + public String getGroupName() { + return this.groupName; + } + + /** + * @return the name ofthis map + */ + public String getName() { + return this.local.getName(); + } + + /** + * @return true if by default always lock objects (default is false) + */ + public boolean isAlwaysLock() { + return this.alwaysLock; + } + + /** + * @param alwaysLock - set true if objects inserted will always + * be locked (default is false) + */ + public void setAlwaysLock(boolean alwaysLock) { + this.alwaysLock = alwaysLock; + } + + /** + * @return the heartBeatInterval + */ + public long getHeartBeatInterval() { + return this.heartBeatInterval; + } + + /** + * @param heartBeatInterval + * the heartBeatInterval to set + */ + public void setHeartBeatInterval(long heartBeatInterval) { + this.heartBeatInterval = heartBeatInterval; + } + + /** + * Add a listener for membership changes + * @param l + */ + public void addMemberChangedListener(MemberChangedListener l) { + this.membershipListeners.add(l); + } + + /** + * Remove a listener for membership changes + * @param l + */ + public void removeMemberChangedListener(MemberChangedListener l) { + this.membershipListeners.remove(l); + } + + /** + * Add a listener for map changes + * @param l + */ + public void addMapChangedListener(MapChangedListener l) { + this.mapChangedListeners.add(l); + } + + /** + * Remove a listener for map changes + * @param l + */ + public void removeMapChangedListener(MapChangedListener l) { + this.mapChangedListeners.remove(l); + } + + /** + * Add a listener for group messages + * @param l + */ + public void addGroupMessageListener(GroupMessageListener l) { + this.groupMessageListeners.add(l); + } + + + /** + * remove a listener for group messages + * @param l + */ + public void removeGroupMessageListener(GroupMessageListener l) { + this.groupMessageListeners.remove(l); + } + + + /** + * @return the timeToLive + */ + public int getTimeToLive() { + return this.timeToLive; + } + + /** + * @param timeToLive + * the timeToLive to set + */ + public void setTimeToLive(int timeToLive) { + this.timeToLive = timeToLive; + } + + /** + * @return the removeOwnedObjectsOnExit + */ + public boolean isRemoveOwnedObjectsOnExit() { + return this.removeOwnedObjectsOnExit; + } + + /** + * Sets the policy for owned objects in the group If set to true, when this + * GroupMap stops, + * any objects it owns will be removed from the group map + * @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set + */ + public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) { + this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit; + } + + /** + * @return releaseLockOnExit - true by default + */ + public boolean isReleaseLockOnExit() { + return releaseLockOnExit; + } + + /** + * set release lock on exit - true by default + * @param releaseLockOnExit the releaseLockOnExit to set + */ + public void setReleaseLockOnExit(boolean releaseLockOnExit) { + this.releaseLockOnExit = releaseLockOnExit; + } + + /** + * @return the lockTimeToLive + */ + public int getLockTimeToLive() { + return lockTimeToLive; + } + + /** + * @param lockTimeToLive the lockTimeToLive to set + */ + public void setLockTimeToLive(int lockTimeToLive) { + this.lockTimeToLive = lockTimeToLive; + } + + /** + * @return the minimumGroupSize + */ + public int getMinimumGroupSize() { + return this.minimumGroupSize; + } + + /** + * @param minimumGroupSize + * the minimumGroupSize to set + */ + public void setMinimumGroupSize(int minimumGroupSize) { + this.minimumGroupSize = minimumGroupSize; + } + + /** + * @return the coordinatorWeight + */ + public int getCoordinatorWeight() { + return this.coordinatorWeight; + } + /** + * @param coordinatorWeight the coordinatorWeight to set + */ + public void setCoordinatorWeight(int coordinatorWeight) { + this.coordinatorWeight = coordinatorWeight; + } + + /** + * clear entries from the Map + * + * @throws IllegalStateException + */ + public void clear() throws IllegalStateException { + checkStatus(); + if (this.localMap != null && !this.localMap.isEmpty()) { + Set keys = null; + synchronized (this.mapMutex) { + keys = new HashSet(this.localMap.keySet()); + } + for (K key : keys) { + remove(key); + } + } + this.localMap.clear(); + } + + public boolean containsKey(Object key) { + synchronized (this.mapMutex) { + return this.localMap != null ? this.localMap.containsKey(key) + : false; + } + } + + public boolean containsValue(Object value) { + EntryValue entryValue = new EntryValue(null, value); + synchronized (this.mapMutex) { + return this.localMap != null ? this.localMap + .containsValue(entryValue) : false; + } + } + + public Set> entrySet() { + Map result = new HashMap(); + synchronized (this.mapMutex) { + if (this.localMap != null) { + for (EntryValue entry : this.localMap.values()) { + result.put((K) entry.getKey(), entry.getValue()); + } + } + } + return result.entrySet(); + } + + public V get(Object key) { + EntryValue value = null; + synchronized (this.mapMutex) { + value = this.localMap != null ? this.localMap.get(key) : null; + } + return value != null ? value.getValue() : null; + } + + public boolean isEmpty() { + synchronized (this.mapMutex) { + return this.localMap != null ? this.localMap.isEmpty() : true; + } + } + + public Set keySet() { + Set result = null; + synchronized(this.mapMutex) { + result = new HashSet(this.localMap.keySet()); + } + return result; + } + + /** + * Puts an value into the map associated with the key + * + * @param key + * @param value + * @return the old value or null + * @throws GroupMapUpdateException + * @throws IllegalStateException + * + */ + public V put(K key, V value) throws GroupMapUpdateException, + IllegalStateException { + return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),isReleaseLockOnExit(), + getTimeToLive(),getLockTimeToLive()); + } + + /** + * Puts an value into the map associated with the key + * + * @param key + * @param value + * @param lock + * @param removeOnExit + * @param releaseLockOnExit + * @param timeToLive + * @param lockTimeToLive + * @return the old value or null + * @throws GroupMapUpdateException + * @throws IllegalStateException + * + */ + public V put(K key, V value, boolean lock, boolean removeOnExit,boolean releaseLockOnExit, + long timeToLive,long lockTimeToLive) throws GroupMapUpdateException, + IllegalStateException { + checkStatus(); + EntryKey entryKey = new EntryKey(this.local, key); + entryKey.setLocked(lock); + entryKey.setRemoveOnExit(removeOnExit); + entryKey.setReleaseLockOnExit(releaseLockOnExit); + entryKey.setTimeToLive(timeToLive); + entryKey.setLockTimeToLive(lockTimeToLive); + EntryMessage entryMsg = new EntryMessage(); + entryMsg.setKey(entryKey); + entryMsg.setValue(value); + entryMsg.setType(EntryMessage.MessageType.INSERT); + return (V) sendStateRequest(getCoordinator(), entryMsg); + } + + /** + * Remove a lock on a key + * @param key + * @throws GroupMapUpdateException + */ + public void unlock(K key) throws GroupMapUpdateException{ + EntryKey entryKey = new EntryKey(this.local, key); + entryKey.setLocked(false); + EntryMessage entryMsg = new EntryMessage(); + entryMsg.setKey(entryKey); + entryMsg.setLockUpdate(true); + sendStateRequest(getCoordinator(), entryMsg); + } + + /** + * Lock a key in the distributed map + * @param key + * @throws GroupMapUpdateException + */ + public void lock(K key) throws GroupMapUpdateException{ + lock(key,getLockTimeToLive()); + } + + /** + * Lock a key in the distributed map + * @param key + * @param lockTimeToLive + * @throws GroupMapUpdateException + */ + public void lock(K key,long lockTimeToLive) throws GroupMapUpdateException{ + EntryKey entryKey = new EntryKey(this.local, key); + entryKey.setLocked(true); + entryKey.setLockTimeToLive(lockTimeToLive); + EntryMessage entryMsg = new EntryMessage(); + entryMsg.setKey(entryKey); + entryMsg.setLockUpdate(true); + sendStateRequest(getCoordinator(), entryMsg); + } + + /** + * Add the Map to the distribution + * + * @param t + * @throws GroupMapUpdateException + * @throws IllegalStateException + */ + public void putAll(Map t) + throws GroupMapUpdateException, IllegalStateException { + putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),isReleaseLockOnExit(), + getTimeToLive(),getLockTimeToLive()); + } + + /** + * Add the Map to the distribution + * + * @param t + * @param lock + * @param removeOnExit + * @param releaseLockOnExit + * @param timeToLive + * @param lockTimeToLive + * @throws GroupMapUpdateException + * @throws IllegalStateException + */ + public void putAll(Map t, boolean lock, boolean removeOnExit,boolean releaseLockOnExit, + long timeToLive,long lockTimeToLive) + throws GroupMapUpdateException, IllegalStateException { + for (java.util.Map.Entry entry : t.entrySet()) { + put(entry.getKey(), entry.getValue(), lock, removeOnExit, + releaseLockOnExit, timeToLive, lockTimeToLive); + } + } + + /** + * remove a value from the map associated with the key + * + * @param key + * @return the Value or null + * @throws GroupMapUpdateException + * @throws IllegalStateException + * + */ + public V remove(Object key) throws GroupMapUpdateException, + IllegalStateException { + EntryKey entryKey = new EntryKey(this.local, (K) key); + return doRemove(entryKey); + } + + V doRemove(EntryKey key) throws GroupMapUpdateException, + IllegalStateException { + checkStatus(); + EntryMessage entryMsg = new EntryMessage(); + entryMsg.setKey(key); + entryMsg.setType(EntryMessage.MessageType.DELETE); + return (V) sendStateRequest(getCoordinator(), entryMsg); + } + + public int size() { + synchronized (this.mapMutex) { + return this.localMap != null ? this.localMap.size() : 0; + } + } + + public Collection values() { + List result = new ArrayList(); + synchronized (this.mapMutex) { + if (this.localMap != null) { + for (EntryValue value : this.localMap.values()) { + result.add(value.getValue()); + } + } + } + return result; + } + + /** + * @return a set of the members + */ + public Set members() { + Set result = new HashSet(); + result.addAll(this.members.values()); + return result; + } + + /** + * Get a member by its unique id + * @param id + * @return + */ + public Member getMemberById(String id) { + return this.members.get(id); + } + + /** + * Return a member of the Group with the matching name + * @param name + * @return + */ + public Member getMemberByName(String name) { + if (name != null) { + for (Member member :this.members.values()) { + if(member.getName().equals(name)) { + return member; + } + } + } + return null; + } + + /** + * @return the local member that represents this Group instance + */ + public Member getLocalMember() { + return this.local; + } + + /** + * @param key + * @return true if this is the owner of the key + */ + public boolean isOwner(K key) { + EntryValue entryValue = null; + synchronized (this.mapMutex) { + entryValue = this.localMap != null ? this.localMap.get(key) + : null; + } + boolean result = false; + if (entryValue != null) { + result = entryValue.getKey().getOwner().getId().equals(this.local.getId()); + } + return result; + } + + /** + * Get the owner of a key + * + * @param key + * @return the owner - or null if the key doesn't exist + */ + EntryKey getKey(Object key) { + EntryValue entryValue = null; + synchronized (this.mapMutex) { + entryValue = this.localMap != null ? this.localMap.get(key) + : null; + } + return entryValue != null ? entryValue.getKey() : null; + } + + /** + * @return true if the coordinator for the map + */ + public boolean isCoordinator() { + return this.local.equals(this.coordinator); + } + + /** + * @return the coordinator + */ + public Member getCoordinator() { + return this.coordinator; + } + + /** + * Broadcast a message to the group + * @param message + * @throws JMSException + */ + public void broadcastMessage(Object message) throws JMSException { + checkStatus(); + ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message); + objMsg.setJMSCorrelationID(this.idGenerator.generateId()); + objMsg.setJMSType(MESSAGE_TYPE); + objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); + this.messageProducer.send(this.messageTopic, objMsg); + } + + /** + * As the group for a response - one will be selected from the group + * @param member + * @param message + * @param timeout in milliseconds - a value if 0 means wait until complete + * @return + * @throws JMSException + */ + public Serializable broadcastMessageRequest(Object message, long timeout) + throws JMSException { + checkStatus(); + Object result = null; + MapRequest request = new MapRequest(); + String id = this.idGenerator.generateId(); + synchronized (this.messageRequests) { + this.messageRequests.put(id, request); + } + ObjectMessage objMsg = this.stateSession + .createObjectMessage((Serializable) message); + objMsg.setJMSReplyTo(this.inboxTopic); + objMsg.setJMSCorrelationID(id); + objMsg.setJMSType(MESSAGE_TYPE); + objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); + this.messageProducer.send(this.messageQueue, objMsg); + result = request.get(timeout); + return (Serializable) result; + } + + /** + * Send a message to the group - but only the least loaded member will process it + * @param message + * @throws JMSException + */ + public void sendMessage(Object message) throws JMSException { + checkStatus(); + ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message); + objMsg.setJMSCorrelationID(this.idGenerator.generateId()); + objMsg.setJMSType(MESSAGE_TYPE); + objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); + this.messageProducer.send(this.messageQueue, objMsg); + } + + /** + * Send a message to an individual member + * @param member + * @param message + * @throws JMSException + */ + public void sendMessage(Member member, Object message) throws JMSException { + checkStatus(); + ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message); + objMsg.setJMSCorrelationID(this.idGenerator.generateId()); + objMsg.setJMSType(MESSAGE_TYPE); + objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); + this.messageProducer.send(member.getInBoxDestination(), objMsg); + } + + /** + * Send a request to a member + * @param member + * @param message + * @param timeout in milliseconds - a value if 0 means wait until complete + * @return the request or null + * @throws JMSException + */ + public Object sendMessageRequest(Member member, Object message,long timeout) throws JMSException { + checkStatus(); + Object result = null; + MapRequest request = new MapRequest(); + String id = this.idGenerator.generateId(); + synchronized (this.messageRequests) { + this.messageRequests.put(id, request); + } + ObjectMessage objMsg = this.stateSession + .createObjectMessage((Serializable) message); + objMsg.setJMSReplyTo(this.inboxTopic); + objMsg.setJMSCorrelationID(id); + objMsg.setJMSType(MESSAGE_TYPE); + objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); + this.messageProducer.send(member.getInBoxDestination(), objMsg); + result = request.get(timeout); + return result; + } + + /** + * send a response to a message + * @param member + * @param replyId + * @param message + * @throws JMSException + */ + public void sendMessageResponse(Member member,String replyId, Object message) throws JMSException { + checkStatus(); + ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message); + objMsg.setJMSCorrelationID(replyId); + objMsg.setJMSType(MESSAGE_TYPE); + objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); + this.messageProducer.send(member.getInBoxDestination(), objMsg); + } + + /** + * Select a coordinator - coordinator weighting is used - or if everything + * is equal - a comparison of member ids. + * + * @param members + * @return + */ + protected Member selectCordinator(List list) { + Collections.sort(list, new Comparator() { + public int compare(Member m1, Member m2) { + int result = m1.getCoordinatorWeight() + - m2.getCoordinatorWeight(); + if (result == 0) { + result = m1.getId().compareTo(m2.getId()); + } + return result; + } + }); + Member result = list.isEmpty() ? this.local : list.get(list.size()-1); + return result; + } + + Object sendStateRequest(Member member, Serializable payload) { + Object result = null; + MapRequest request = new MapRequest(); + String id = this.idGenerator.generateId(); + synchronized (this.stateRequests) { + this.stateRequests.put(id, request); + } + try { + ObjectMessage objMsg = this.stateSession.createObjectMessage(payload); + objMsg.setJMSReplyTo(this.inboxTopic); + objMsg.setJMSCorrelationID(id); + objMsg.setJMSType(STATE_TYPE); + this.stateProducer.send(member.getInBoxDestination(), objMsg); + result = request.get(getHeartBeatInterval()); + } catch (JMSException e) { + if (this.started.get()) { + LOG.error("Failed to send request " + payload, e); + } + } + if (result instanceof GroupMapUpdateException) { + throw (GroupMapUpdateException) result; + } + if (result instanceof EntryMessage) { + EntryMessage entryMsg = (EntryMessage) result; + result = entryMsg.getOldValue(); + } + return result; + } + + void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member, + Serializable payload) { + MapRequest request = new MapRequest(); + String id = this.idGenerator.generateId(); + asyncRequest.add(id, request); + synchronized (this.stateRequests) { + this.stateRequests.put(id, request); + } + try { + ObjectMessage objMsg = this.stateSession.createObjectMessage(payload); + objMsg.setJMSReplyTo(this.inboxTopic); + objMsg.setJMSCorrelationID(id); + objMsg.setJMSType(STATE_TYPE); + this.stateProducer.send(member.getInBoxDestination(), objMsg); + } catch (JMSException e) { + if (this.started.get()) { + LOG.error("Failed to send async request " + payload, e); + } + } + } + + void sendReply(Object reply, Destination replyTo, String id) { + if (this.started.get()) { + if (replyTo != null) { + if (replyTo.equals(this.local.getInBoxDestination())) { + processRequest(id, reply); + } else { + try { + ObjectMessage replyMsg = this.stateSession + .createObjectMessage((Serializable) reply); + replyMsg.setJMSCorrelationID(id); + replyMsg.setJMSType(STATE_TYPE); + this.stateProducer.send(replyTo, replyMsg); + } catch (JMSException e) { + LOG.error("Couldn't send reply from co-ordinator", e); + } + } + } else { + LOG.error("NULL replyTo destination"); + } + } + } + + void broadcastMapUpdate(EntryMessage entry, String correlationId) { + if(this.started.get()) { + try { + EntryMessage copy = entry.copy(); + copy.setMapUpdate(true); + ObjectMessage objMsg = this.stateSession.createObjectMessage(copy); + objMsg.setJMSCorrelationID(correlationId); + objMsg.setJMSType(STATE_TYPE); + this.stateProducer.send(this.stateTopic, objMsg); + } catch (JMSException e) { + if (this.started.get()) { + LOG.error("Failed to send EntryMessage " + entry, e); + } + } + } + } + + + + void processJMSMessage(Message message) { + if (message instanceof ObjectMessage) { + ObjectMessage objMsg = (ObjectMessage) message; + + try { + String messageType = objMsg.getJMSType(); + String id = objMsg.getJMSCorrelationID(); + String memberId = objMsg.getStringProperty(MEMBER_ID_PROPERTY); + Destination replyTo = objMsg.getJMSReplyTo(); + Object payload = objMsg.getObject(); + if (messageType != null) { + if (messageType.equals(STATE_TYPE)) { + if (payload instanceof Member) { + handleHeartbeats((Member) payload); + } else if (payload instanceof EntryMessage) { + EntryMessage entryMsg = (EntryMessage) payload; + entryMsg = entryMsg.copy(); + if(entryMsg.isLockUpdate()) { + processLockUpdate(entryMsg, replyTo, id); + } + else if (entryMsg.isMapUpdate()) { + processMapUpdate(entryMsg); + } else { + processEntryMessage(entryMsg, replyTo, id); + } + } else if (payload instanceof ElectionMessage) { + ElectionMessage electionMsg = (ElectionMessage) payload; + electionMsg = electionMsg.copy(); + processElectionMessage(electionMsg, replyTo, id); + } + + }else if (messageType.equals(MESSAGE_TYPE)) { + processGroupMessage(memberId,id, replyTo, payload); + }else { + LOG.error("Unknown message type: " + messageType); + } + processRequest(id, payload); + }else { + LOG.error("Can't process a message type of null"); + } + } catch (JMSException e) { + LOG.warn("Failed to process message: " + message, e); + } + } + } + + void processRequest(String id, Object value) { + if (id != null) { + MapRequest result = null; + synchronized (this.stateRequests) { + result = this.stateRequests.remove(id); + } + if (result != null) { + result.put(id, value); + } + } + } + + void processLockUpdate(EntryMessage entryMsg, Destination replyTo, + String correlationId) { + synchronized (this.mapMutex) { + boolean newLock = entryMsg.getKey().isLocked(); + Member newOwner = entryMsg.getKey().getOwner(); + long newLockExpiration = newLock ? entryMsg.getKey().getLockExpiration():0l; + System.err.println(getName()+" PROC LOC = " + newOwner.getName()+ " LOCK = "+newLock); + + if (isCoordinator() && !entryMsg.isMapUpdate()) { + + EntryKey originalKey = getKey(entryMsg.getKey().getKey()); + if (originalKey != null) { + if (originalKey.isLocked()) { + if (!originalKey.getOwner().equals( + entryMsg.getKey().getOwner())) { + Serializable reply = new GroupMapUpdateException( + "Owned by " + originalKey.getOwner()); + sendReply(reply, replyTo, correlationId); + System.err.println(getName()+" PROC LOC SEND EXCEPTION TO " + newOwner.getName()); + } else { + originalKey.setLocked(newLock); + originalKey.setOwner(newOwner); + originalKey.setLockExpiration(newLockExpiration); + broadcastMapUpdate(entryMsg, correlationId); + + } + } else { + originalKey.setLocked(newLock); + originalKey.setOwner(newOwner); + originalKey.setLockExpiration(newLockExpiration); + broadcastMapUpdate(entryMsg, correlationId); + } + + } + } else { + EntryKey originalKey = getKey(entryMsg.getKey().getKey()); + if (originalKey != null) { + originalKey.setLocked(newLock); + originalKey.setOwner(newOwner); + originalKey.setLockExpiration(newLockExpiration); + } + } + } + } + + void processEntryMessage(EntryMessage entryMsg, Destination replyTo, + String correlationId) { + if (isCoordinator()) { + EntryKey key = entryMsg.getKey(); + EntryValue value = new EntryValue(key, + (V) entryMsg.getValue()); + boolean insert = entryMsg.isInsert(); + boolean containsKey = false; + synchronized (this.mapMutex) { + containsKey = this.localMap.containsKey(key.getKey()); + } + if (containsKey) { + EntryKey originalKey = getKey(key.getKey()); + if (originalKey.equals(key.getOwner()) || !originalKey.isLocked()) { + EntryValue old = null; + if (insert) { + synchronized (this.mapMutex) { + old = this.localMap.put(key.getKey(), value); + } + } else { + synchronized (this.mapMutex) { + old = this.localMap.remove(key.getKey()); + } + } + entryMsg.setOldValue(old.getValue()); + broadcastMapUpdate(entryMsg, correlationId); + fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), value + .getValue(), false); + } else { + Serializable reply = new GroupMapUpdateException( + "Owned by " + originalKey.getOwner()); + sendReply(reply, replyTo, correlationId); + } + } else { + if (insert) { + synchronized (this.mapMutex) { + this.localMap.put(key.getKey(), value); + } + broadcastMapUpdate(entryMsg, correlationId); + fireMapChanged(key.getOwner(), key.getKey(), null, value + .getValue(), false); + } else { + sendReply(null, replyTo, correlationId); + } + } + } + } + + void processMapUpdate(EntryMessage entryMsg) { + boolean containsKey = false; + EntryKey key = entryMsg.getKey(); + EntryValue value = new EntryValue(key, (V) entryMsg + .getValue()); + boolean insert = entryMsg.isInsert()||entryMsg.isSync(); + synchronized (this.mapMutex) { + containsKey = this.localMap.containsKey(key.getKey()); + } + if (!isCoordinator() || entryMsg.isSync()) { + if (containsKey) { + if (key.isLockExpired()) { + EntryValue old = this.localMap.get(key.getKey()); + if (old != null) { + old.getKey().setLocked(false); + } + } else { + EntryValue old = null; + if (insert) { + synchronized (this.mapMutex) { + old = this.localMap.put(key.getKey(), value); + } + } else { + synchronized (this.mapMutex) { + old = this.localMap.remove(key.getKey()); + value.setValue(null); + } + } + + fireMapChanged(key.getOwner(), key.getKey(), + old.getValue(), value.getValue(), entryMsg + .isExpired()); + } + } else { + if (insert) { + synchronized (this.mapMutex) { + this.localMap.put(key.getKey(), value); + } + fireMapChanged(key.getOwner(), key.getKey(), null, value + .getValue(), false); + } + } + } + } + + void processGroupMessage(String memberId,String replyId,Destination replyTo, Object payload) { + Member member = this.members.get(memberId); + if (member != null) { + fireMemberMessage(member, replyId, payload); + } + if (replyId != null) { + MapRequest result = null; + synchronized (this.messageRequests) { + result = this.messageRequests.remove(replyId); + } + if (result != null) { + result.put(replyId, payload); + } + } + } + + void handleHeartbeats(Message message) { + try { + if (message instanceof ObjectMessage) { + ObjectMessage objMsg = (ObjectMessage) message; + Member member = (Member) objMsg.getObject(); + handleHeartbeats(member); + } else { + LOG.warn("Unexpected message: " + message); + } + } catch (JMSException e) { + LOG.warn("Failed to handle heart beat", e); + } + } + + void handleHeartbeats(Member member) { + member.setTimeStamp(System.currentTimeMillis()); + if (this.members.put(member.getId(), member) == null) { + + fireMemberStarted(member); + if (!member.equals(this.local)) { + sendHeartBeat(member.getInBoxDestination()); + } + election(member, true); + synchronized (this.memberMutex) { + this.memberMutex.notifyAll(); + } + } + } + + void handleConsumerEvents(ConsumerEvent event) { + if (!event.isStarted()) { + Member member = this.members.remove(event.getConsumerId() + .toString()); + if (member != null) { + fireMemberStopped(member); + election(member, false); + } + } + } + + void checkMembership() { + if (this.started.get() && this.electionFinished.get()) { + long checkTime = System.currentTimeMillis() + - getHeartBeatInterval(); + boolean doElection = false; + for (Member member : this.members.values()) { + if (member.getTimeStamp() < checkTime) { + LOG.info("Member timestamp expired " + member); + this.members.remove(member.getId()); + fireMemberStopped(member); + doElection = true; + } + } + if (doElection) { + election(null, false); + } + } + } + + void expirationSweep() { + if (isCoordinator() && this.started.get() && this.electionFinished.get()) { + List expiredMessages = null; + List expiredLocks = null; + synchronized (this.mapMutex) { + Map> map = this.localMap; + if (map != null) { + long currentTime = System.currentTimeMillis(); + for (EntryValue value : map.values()) { + EntryKey k = value.getKey(); + if (k.isExpired(currentTime)) { + if (expiredMessages == null) { + expiredMessages = new ArrayList(); + expiredMessages.add(k); + } + }else if (k.isLockExpired(currentTime)) { + k.setLocked(false); + if (expiredLocks == null) { + expiredLocks = new ArrayList(); + expiredLocks.add(k); + } + expiredLocks.add(k); + } + } + } + } + //do the actual removal of entries in a separate thread + if (expiredMessages != null) { + final List expire = expiredMessages; + this.stateExecutor.execute(new Runnable() { + public void run() { + doMessageExpiration(expire); + } + }); + } + if (expiredLocks != null) { + final List expire = expiredLocks; + this.stateExecutor.execute(new Runnable() { + public void run() { + doLockExpiration(expire); + } + }); + } + } + + } + + void doMessageExpiration(List list) { + if (this.started.get() && this.electionFinished.get() + && isCoordinator()) { + for (EntryKey k : list) { + EntryValue old = null; + synchronized (this.mapMutex) { + old = this.localMap.remove(k.getKey()); + } + if (old != null) { + EntryMessage entryMsg = new EntryMessage(); + entryMsg.setType(EntryMessage.MessageType.DELETE); + entryMsg.setExpired(true); + entryMsg.setKey(k); + entryMsg.setValue(old.getValue()); + broadcastMapUpdate(entryMsg, ""); + fireMapChanged(k.getOwner(), k.getKey(), old.getValue(), + null, true); + } + } + } + } + + void doLockExpiration(List list) { + if (this.started.get() && this.electionFinished.get() + && isCoordinator()) { + for (EntryKey k : list) { + + EntryMessage entryMsg = new EntryMessage(); + entryMsg.setType(EntryMessage.MessageType.DELETE); + entryMsg.setLockExpired(true); + entryMsg.setKey(k); + broadcastMapUpdate(entryMsg, ""); + } + + } + } + + void sendHeartBeat() { + sendHeartBeat(this.heartBeatTopic); + } + + void sendHeartBeat(Destination destination) { + if (this.started.get()) { + try { + ObjectMessage msg = this.stateSession + .createObjectMessage(this.local); + msg.setJMSType(STATE_TYPE); + this.stateProducer.send(destination, msg); + } catch (javax.jms.IllegalStateException e) { + // ignore - as we are probably stopping + } catch (Throwable e) { + if (this.started.get()) { + LOG.warn("Failed to send heart beat", e); + } + } + } + } + + void updateNewMemberMap(Member member) { + List>> list = new ArrayList>>(); + synchronized (this.mapMutex) { + if (this.localMap != null) { + for (Map.Entry> entry : this.localMap + .entrySet()) { + list.add(entry); + } + } + } + try { + for (Map.Entry> entry : list) { + EntryMessage entryMsg = new EntryMessage(); + entryMsg.setKey(entry.getValue().getKey()); + entryMsg.setValue(entry.getValue().getValue()); + entryMsg.setType(EntryMessage.MessageType.SYNC); + entryMsg.setMapUpdate(true); + ObjectMessage objMsg = this.stateSession + .createObjectMessage(entryMsg); + if (!member.equals(entry.getValue().getKey().getOwner())) { + objMsg.setJMSType(STATE_TYPE); + this.stateProducer.send(member.getInBoxDestination(), objMsg); + } + } + } catch(javax.jms.IllegalStateException e) { + //ignore - as closing + }catch (JMSException e) { + if (started.get()) { + LOG.warn("Failed to update new member ", e); + } + } + } + + void fireMemberStarted(Member member) { + LOG.info(this.local.getName() + " Member started " + member); + for (MemberChangedListener l : this.membershipListeners) { + l.memberStarted(member); + } + } + + void fireMemberStopped(Member member) { + LOG.info(this.local.getName() + " Member stopped " + member); + for (MemberChangedListener l : this.membershipListeners) { + l.memberStopped(member); + } + // remove all entries owned by the stopped member + List> tmpList = new ArrayList>(); + boolean mapExists = false; + synchronized (this.mapMutex) { + mapExists = this.localMap != null; + if (mapExists) { + for (EntryValue value : this.localMap.values()) { + EntryKey entryKey = value.getKey(); + if (entryKey.getOwner().equals(member)) { + if (entryKey.isRemoveOnExit()) { + tmpList.add(entryKey); + } + if (entryKey.isReleaseLockOnExit()) { + entryKey.setLocked(false); + } + } + } + } + } + if (mapExists) { + for (EntryKey entryKey : tmpList) { + EntryValue value = null; + synchronized (this.mapMutex) { + value = this.localMap.remove(entryKey); + } + fireMapChanged(member, entryKey.getKey(), value.getValue(), + null,false); + } + } + } + + void fireMemberMessage(final Member member,final String replyId,final Object message) { + if (this.started.get() && this.stateExecutor != null + && !this.messageExecutor.isShutdown()) { + this.messageExecutor.execute(new Runnable() { + public void run() { + doFireMemberMessage(member,replyId,message); + } + }); + } + } + + void doFireMemberMessage(Member sender,String replyId,Object message) { + if(this.started.get()) { + for(GroupMessageListener l : this.groupMessageListeners) { + l.messageDelivered(sender,replyId, message); + } + } + } + + void fireMapChanged(final Member owner, final Object key, + final Object oldValue, final Object newValue, final boolean expired) { + if (this.started.get() && this.stateExecutor != null + && !this.stateExecutor.isShutdown()) { + this.stateExecutor.execute(new Runnable() { + public void run() { + doFireMapChanged(owner, key, oldValue, newValue, expired); + } + }); + } + } + + void doFireMapChanged(Member owner, Object key, Object oldValue, + Object newValue, boolean expired) { + if (this.started.get()) { + for (MapChangedListener l : this.mapChangedListeners) { + if (oldValue == null) { + l.mapInsert(owner, key, newValue); + } else if (newValue == null) { + l.mapRemove(owner, key, oldValue, expired); + } else { + l.mapUpdate(owner, key, oldValue, newValue); + } + } + } + } + + void election(final Member member, final boolean memberStarted) { + if (this.started.get() && this.stateExecutor != null + && !this.electionExecutor.isShutdown()) { + synchronized (this.electionExecutor) { + //remove any queued election tasks + List list = new ArrayList( + this.electionExecutor.getQueue()); + for (Runnable r : list) { + this.electionExecutor.remove(r); + } + } + this.electionExecutor.execute(new Runnable() { + public void run() { + doElection(member, memberStarted); + } + }); + } + } + + void doElection(Member member, boolean memberStarted) { + if ((member == null || !member.equals(this.local)) + && (this.electionFinished.compareAndSet(true, false)||true)) { + boolean wasCoordinator = isCoordinator() && !isEmpty(); + // call an election + while (!callElection()&&this.started.get()) + ; + if (this.started.get()) { + List members = new ArrayList(this.members.values()); + this.coordinator = selectCordinator(members); + if (isCoordinator()) { + broadcastElectionType(ElectionMessage.MessageType.COORDINATOR); + } + if (memberStarted && member != null) { + if (wasCoordinator || isCoordinator() && this.started.get()) { + updateNewMemberMap(member); + } + } + if (!this.electionFinished.get()) { + try { + synchronized (this.electionFinished) { + this.electionFinished.wait(this.heartBeatInterval * 2); + } + } catch (InterruptedException e) { + } + } + if (!this.electionFinished.get()) { + // we must be the coordinator + this.coordinator = this.local; + this.electionFinished.set(true); + broadcastElectionType(ElectionMessage.MessageType.COORDINATOR); + } + } + } + } + + boolean callElection() { + List members = new ArrayList(this.members.values()); + AsyncMapRequest request = new AsyncMapRequest(); + for (Member member : members) { + if (this.local.getId().compareTo(member.getId()) < 0) { + ElectionMessage msg = new ElectionMessage(); + msg.setMember(this.local); + msg.setType(ElectionMessage.MessageType.ELECTION); + sendAsyncStateRequest(request, member, msg); + } + } + return request.isSuccess(getHeartBeatInterval()); + } + + void processElectionMessage(ElectionMessage msg, Destination replyTo, + String correlationId) { + if (msg.isElection()) { + msg.setType(ElectionMessage.MessageType.ANSWER); + msg.setMember(this.local); + sendReply(msg, replyTo, correlationId); + } else if (msg.isCoordinator()) { + synchronized (this.electionFinished) { + this.coordinator = msg.getMember(); + this.electionFinished.set(true); + this.electionFinished.notifyAll(); + } + } + } + + void broadcastElectionType(ElectionMessage.MessageType type) { + if (started.get()) { + try { + ElectionMessage msg = new ElectionMessage(); + msg.setMember(this.local); + msg.setType(type); + ObjectMessage objMsg = this.stateSession.createObjectMessage(msg); + objMsg.setJMSType(STATE_TYPE); + this.stateProducer.send(this.stateTopic, objMsg); + } catch (javax.jms.IllegalStateException e) { + // ignore - we are stopping + } catch (JMSException e) { + if (this.started.get()) { + LOG.error("Failed to broadcast election message: " + type, + e); + } + } + } + } + + void checkStatus() throws IllegalStateException { + if (!started.get()) { + throw new IllegalStateException("GroupMap " + this.local.getName() + + " not started"); + } + waitForElection(); + } + + void waitForElection() { + synchronized (this.electionFinished) { + while (started.get() && !this.electionFinished.get()) { + try { + this.electionFinished.wait(1000); + } catch (InterruptedException e) { + stop(); + Thread.currentThread().interrupt(); + } + } + } + } + + public String toString() { + return "Group:" + getName() + "{id=" + this.local.getId() + + ",coordinator=" + isCoordinator() + ",inbox=" + + this.local.getInBoxDestination() + "}"; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java b/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java deleted file mode 100644 index 1220a6f1d8..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java +++ /dev/null @@ -1,1216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.jms.Topic; -import org.apache.activemq.ActiveMQMessageConsumer; -import org.apache.activemq.Service; -import org.apache.activemq.advisory.ConsumerEvent; -import org.apache.activemq.advisory.ConsumerEventSource; -import org.apache.activemq.advisory.ConsumerListener; -import org.apache.activemq.thread.SchedulerTimerTask; -import org.apache.activemq.util.IdGenerator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - *

- * A GroupMap is a Map implementation that is used to shared state - * amongst a distributed group of other GroupMap instances. - * Membership of a group is handled automatically using discovery. - *

- * The underlying transport is JMS and there are some optimizations that occur - * for membership if used with ActiveMQ - but GroupMap can be used - * with any JMS implementation. - * - *

- * Updates to the group shared map are controlled by a coordinator. The - * coordinator is elected by the member with the lowest lexicographical id - - * based on the bully algorithm [Silberschatz et al. 1993] - *

- * The {@link #selectCordinator(Collection members)} method may be - * overridden to implement a custom mechanism for choosing how the coordinator - * is elected for the map. - *

- * New GroupMap instances have their state updated by the coordinator, - * and coordinator failure is handled automatically within the group. - *

- * All updates are totally ordered through the coordinator, whilst read operations - * happen locally. - *

- * A GroupMapsupports the concept of owner only updates(write locks), - * shared updates, entry expiration times and removal on owner exit - - * all of which are optional. - * - *

- * - * @param the key type - * @param the value type - * - */ -public class GroupMap implements Map, Service { - /** - * default interval within which to detect a member failure - */ - public static final long DEFAULT_HEART_BEAT_INTERVAL = 2000; - private static final long EXPIRATION_SWEEP_INTERVAL = 1000; - private static final Log LOG = LogFactory.getLog(GroupMap.class); - private static final String STATE_TOPIC_PREFIX = GroupMap.class.getName() - + "."; - private final Object mapMutex = new Object(); - private Map, EntryValue> localMap; - private Map members = new ConcurrentHashMap(); - private Map requests = new HashMap(); - private List membershipListeners = new CopyOnWriteArrayList(); - private List mapChangedListeners = new CopyOnWriteArrayList(); - Member local; - private Member coordinator; - private String groupName; - private boolean sharedWrites; - private Connection connection; - private Session session; - private Topic topic; - private Topic heartBeatTopic; - private Topic inboxTopic; - private MessageProducer producer; - private ConsumerEventSource consumerEvents; - private AtomicBoolean started = new AtomicBoolean(); - private SchedulerTimerTask heartBeatTask; - private SchedulerTimerTask checkMembershipTask; - private SchedulerTimerTask expirationTask; - private Timer timer; - private long heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL; - private IdGenerator idGenerator = new IdGenerator(); - private boolean removeOwnedObjectsOnExit; - private int timeToLive; - private int minimumGroupSize = 1; - private final AtomicBoolean electionFinished = new AtomicBoolean(true); - private ExecutorService executor; - private final Object memberMutex = new Object(); - - /** - * @param connection - * @param name - */ - public GroupMap(Connection connection, String name) { - this(connection, "default", name); - } - - /** - * @param connection - * @param groupName - * @param name - */ - public GroupMap(Connection connection, String groupName, String name) { - this.connection = connection; - this.local = new Member(name); - this.coordinator = this.local; - this.groupName = groupName; - } - - /** - * Set the local map implementation to be used By default its a HashMap - - * but you could use a Cache for example - * - * @param map - */ - public void setLocalMap(Map map) { - synchronized (this.mapMutex) { - this.localMap = map; - } - } - - /** - * Start membership to the group - * - * @throws Exception - * - */ - public void start() throws Exception { - if (this.started.compareAndSet(false, true)) { - synchronized (this.mapMutex) { - if (this.localMap == null) { - this.localMap = new HashMap, EntryValue>(); - } - } - this.connection.start(); - this.session = this.connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); - this.producer = this.session.createProducer(null); - this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - this.inboxTopic = this.session.createTemporaryTopic(); - String topicName = STATE_TOPIC_PREFIX + this.groupName; - this.topic = this.session.createTopic(topicName); - this.heartBeatTopic = this.session.createTopic(topicName - + ".heartbeat"); - MessageConsumer privateInbox = this.session - .createConsumer(this.inboxTopic); - privateInbox.setMessageListener(new MessageListener() { - public void onMessage(Message message) { - processMessage(message); - } - }); - MessageConsumer mapChangeConsumer = this.session - .createConsumer(this.topic); - mapChangeConsumer.setMessageListener(new MessageListener() { - public void onMessage(Message message) { - processMessage(message); - } - }); - MessageConsumer heartBeatConsumer = this.session - .createConsumer(this.heartBeatTopic); - heartBeatConsumer.setMessageListener(new MessageListener() { - public void onMessage(Message message) { - handleHeartbeats(message); - } - }); - this.consumerEvents = new ConsumerEventSource(this.connection, - this.topic); - this.consumerEvents.setConsumerListener(new ConsumerListener() { - public void onConsumerEvent(ConsumerEvent event) { - handleConsumerEvents(event); - } - }); - this.consumerEvents.start(); - String memberId = null; - if (mapChangeConsumer instanceof ActiveMQMessageConsumer) { - memberId = ((ActiveMQMessageConsumer) mapChangeConsumer) - .getConsumerId().toString(); - } else { - memberId = this.idGenerator.generateId(); - } - this.local.setId(memberId); - this.local.setInBoxDestination(this.inboxTopic); - this.executor = Executors - .newSingleThreadExecutor(new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Election{" - + GroupMap.this.local + "}"); - thread.setDaemon(true); - thread.setPriority(Thread.NORM_PRIORITY); - return thread; - } - }); - sendHeartBeat(); - this.heartBeatTask = new SchedulerTimerTask(new Runnable() { - public void run() { - sendHeartBeat(); - } - }); - this.checkMembershipTask = new SchedulerTimerTask(new Runnable() { - public void run() { - checkMembership(); - } - }); - - this.expirationTask = new SchedulerTimerTask(new Runnable() { - public void run() { - expirationSweep(); - } - }); - this.timer = new Timer("Distributed heart beat", true); - this.timer.scheduleAtFixedRate(this.heartBeatTask, - getHeartBeatInterval() / 3, getHeartBeatInterval() / 2); - this.timer.scheduleAtFixedRate(this.checkMembershipTask, - getHeartBeatInterval(), getHeartBeatInterval()); - this.timer.scheduleAtFixedRate(this.expirationTask, - EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL); - // await for members to join - long timeout = this.heartBeatInterval * this.minimumGroupSize; - long deadline = System.currentTimeMillis() + timeout; - while (this.members.size() < this.minimumGroupSize && timeout > 0) { - synchronized (this.memberMutex) { - this.memberMutex.wait(timeout); - } - timeout = Math.max(deadline - System.currentTimeMillis(), 0); - } - } - } - - /** - * stop membership to the group - * - * @throws Exception - */ - public void stop() { - if (this.started.compareAndSet(true, false)) { - this.expirationTask.cancel(); - this.checkMembershipTask.cancel(); - this.heartBeatTask.cancel(); - this.expirationTask.cancel(); - this.timer.purge(); - if (this.executor != null) { - this.executor.shutdownNow(); - } - try { - this.consumerEvents.stop(); - this.session.close(); - } catch (Exception e) { - LOG.debug("Caught exception stopping", e); - } - } - } - - /** - * @return true if there is elections have finished - */ - public boolean isElectionFinished() { - return this.electionFinished.get(); - } - - /** - * @return the partitionName - */ - public String getGroupName() { - return this.groupName; - } - - /** - * @return the name ofthis map - */ - public String getName() { - return this.local.getName(); - } - - /** - * @return the sharedWrites - */ - public boolean isSharedWrites() { - return this.sharedWrites; - } - - /** - * @param sharedWrites - * the sharedWrites to set - */ - public void setSharedWrites(boolean sharedWrites) { - this.sharedWrites = sharedWrites; - } - - /** - * @return the heartBeatInterval - */ - public long getHeartBeatInterval() { - return this.heartBeatInterval; - } - - /** - * @param heartBeatInterval - * the heartBeatInterval to set - */ - public void setHeartBeatInterval(long heartBeatInterval) { - this.heartBeatInterval = heartBeatInterval; - } - - /** - * @param l - */ - public void addMemberChangedListener(MemberChangedListener l) { - this.membershipListeners.add(l); - } - - /** - * @param l - */ - public void removeMemberChangedListener(MemberChangedListener l) { - this.membershipListeners.remove(l); - } - - /** - * @param l - */ - public void addMapChangedListener(MapChangedListener l) { - this.mapChangedListeners.add(l); - } - - /** - * @param l - */ - public void removeMapChangedListener(MapChangedListener l) { - this.mapChangedListeners.remove(l); - } - - /** - * @return the timeToLive - */ - public int getTimeToLive() { - return this.timeToLive; - } - - /** - * @param timeToLive - * the timeToLive to set - */ - public void setTimeToLive(int timeToLive) { - this.timeToLive = timeToLive; - } - - /** - * @return the removeOwnedObjectsOnExit - */ - public boolean isRemoveOwnedObjectsOnExit() { - return this.removeOwnedObjectsOnExit; - } - - /** - * Sets the policy for owned objects in the group If set to true, when this - * GroupMap stops, - * any objects it owns will be removed from the group map - * @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set - */ - public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) { - this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit; - } - - /** - * @return the minimumGroupSize - */ - public int getMinimumGroupSize() { - return this.minimumGroupSize; - } - - /** - * @param minimumGroupSize - * the minimumGroupSize to set - */ - public void setMinimumGroupSize(int minimumGroupSize) { - this.minimumGroupSize = minimumGroupSize; - } - - /** - * clear entries from the Map - * - * @throws IllegalStateException - */ - public void clear() throws IllegalStateException { - checkStatus(); - if (this.localMap != null && !this.localMap.isEmpty()) { - Set> keys = null; - synchronized (this.mapMutex) { - keys = new HashSet>(this.localMap.keySet()); - } - for (EntryKey key : keys) { - remove(key); - } - } - this.localMap.clear(); - } - - public boolean containsKey(Object key) { - EntryKey stateKey = new EntryKey(this.local, key); - synchronized (this.mapMutex) { - return this.localMap != null ? this.localMap.containsKey(stateKey) - : false; - } - } - - public boolean containsValue(Object value) { - EntryValue entryValue = new EntryValue(this.local, value); - synchronized (this.mapMutex) { - return this.localMap != null ? this.localMap - .containsValue(entryValue) : false; - } - } - - public Set> entrySet() { - Map result = new HashMap(); - synchronized (this.mapMutex) { - if (this.localMap != null) { - for (java.util.Map.Entry, EntryValue> entry : this.localMap - .entrySet()) { - result.put(entry.getKey().getKey(), entry.getValue() - .getValue()); - } - } - } - return result.entrySet(); - } - - public V get(Object key) { - EntryKey stateKey = new EntryKey(this.local, (K) key); - EntryValue value = null; - synchronized (this.mapMutex) { - value = this.localMap != null ? this.localMap.get(stateKey) : null; - } - return value != null ? value.getValue() : null; - } - - public boolean isEmpty() { - synchronized (this.mapMutex) { - return this.localMap != null ? this.localMap.isEmpty() : true; - } - } - - public Set keySet() { - Set result = new HashSet(); - synchronized (this.mapMutex) { - if (this.localMap != null) { - for (EntryKey key : this.localMap.keySet()) { - result.add(key.getKey()); - } - } - } - return result; - } - - /** - * Puts an value into the map associated with the key - * - * @param key - * @param value - * @return the old value or null - * @throws GroupMapUpdateException - * @throws IllegalStateException - * - */ - public V put(K key, V value) throws GroupMapUpdateException, - IllegalStateException { - return put(key, value, isSharedWrites(), isRemoveOwnedObjectsOnExit(), - getTimeToLive()); - } - - /** - * Puts an value into the map associated with the key - * - * @param key - * @param value - * @param sharedWrites - * @param removeOnExit - * @param timeToLive - * @return the old value or null - * @throws GroupMapUpdateException - * @throws IllegalStateException - * - */ - public V put(K key, V value, boolean sharedWrites, boolean removeOnExit, - long timeToLive) throws GroupMapUpdateException, - IllegalStateException { - checkStatus(); - EntryKey entryKey = new EntryKey(this.local, key); - EntryValue stateValue = new EntryValue(this.local, value); - entryKey.setShare(sharedWrites); - entryKey.setRemoveOnExit(removeOnExit); - entryKey.setTimeToLive(timeToLive); - EntryMessage entryMsg = new EntryMessage(); - entryMsg.setKey(entryKey); - entryMsg.setValue(value); - entryMsg.setType(EntryMessage.MessageType.INSERT); - return (V) sendRequest(getCoordinator(), entryMsg); - } - - /** - * Add the Map to the distribution - * - * @param t - * @throws GroupMapUpdateException - * @throws IllegalStateException - */ - public void putAll(Map t) - throws GroupMapUpdateException, IllegalStateException { - putAll(t, isSharedWrites(), isRemoveOwnedObjectsOnExit(), - getTimeToLive()); - } - - /** - * Add the Map to the distribution - * - * @param t - * @param sharedWrites - * @param removeOnExit - * @param timeToLive - * @throws GroupMapUpdateException - * @throws IllegalStateException - */ - public void putAll(Map t, boolean sharedWrites, - boolean removeOnExit, long timeToLive) - throws GroupMapUpdateException, IllegalStateException { - for (java.util.Map.Entry entry : t.entrySet()) { - put(entry.getKey(), entry.getValue(), sharedWrites, removeOnExit, - timeToLive); - } - } - - /** - * remove a value from the map associated with the key - * - * @param key - * @return the Value or null - * @throws GroupMapUpdateException - * @throws IllegalStateException - * - */ - public V remove(Object key) throws GroupMapUpdateException, - IllegalStateException { - EntryKey entryKey = new EntryKey(this.local, (K) key); - return remove(entryKey); - } - - V remove(EntryKey key) throws GroupMapUpdateException, - IllegalStateException { - checkStatus(); - EntryMessage entryMsg = new EntryMessage(); - entryMsg.setKey(key); - entryMsg.setType(EntryMessage.MessageType.DELETE); - return (V) sendRequest(getCoordinator(), entryMsg); - } - - public int size() { - synchronized (this.mapMutex) { - return this.localMap != null ? this.localMap.size() : 0; - } - } - - public Collection values() { - List result = new ArrayList(); - synchronized (this.mapMutex) { - if (this.localMap != null) { - for (EntryValue value : this.localMap.values()) { - result.add(value.getValue()); - } - } - } - return result; - } - - /** - * @return a set of the members - */ - public Set members() { - Set result = new HashSet(); - result.addAll(this.members.values()); - return result; - } - - /** - * @param key - * @return true if this is the owner of the key - */ - public boolean isOwner(K key) { - EntryKey stateKey = new EntryKey(this.local, key); - EntryValue entryValue = null; - synchronized (this.mapMutex) { - entryValue = this.localMap != null ? this.localMap.get(stateKey) - : null; - } - boolean result = false; - if (entryValue != null) { - result = entryValue.getOwner().getId().equals(this.local.getId()); - } - return result; - } - - /** - * Get the owner of a key - * - * @param key - * @return the owner - or null if the key doesn't exist - */ - public Member getOwner(K key) { - EntryKey stateKey = new EntryKey(this.local, key); - EntryValue entryValue = null; - synchronized (this.mapMutex) { - entryValue = this.localMap != null ? this.localMap.get(stateKey) - : null; - } - return entryValue != null ? entryValue.getOwner() : null; - } - - /** - * @return true if the coordinator for the map - */ - public boolean isCoordinator() { - return this.local.equals(this.coordinator); - } - - /** - * @return the coordinator - */ - public Member getCoordinator() { - return this.coordinator; - } - - /** - * Select a coordinator - by default, its the member with the lowest - * lexicographical id - * - * @param members - * @return - */ - protected Member selectCordinator(Collection members) { - Member result = this.local; - for (Member member : members) { - if (result.getId().compareTo(member.getId()) < 0) { - result = member; - } - } - return result; - } - - Object sendRequest(Member member, Serializable payload) { - Object result = null; - MapRequest request = new MapRequest(); - String id = this.idGenerator.generateId(); - synchronized (this.requests) { - this.requests.put(id, request); - } - try { - ObjectMessage objMsg = this.session.createObjectMessage(payload); - objMsg.setJMSReplyTo(this.inboxTopic); - objMsg.setJMSCorrelationID(id); - this.producer.send(member.getInBoxDestination(), objMsg); - result = request.get(getHeartBeatInterval() * 200000); - } catch (JMSException e) { - if (this.started.get()) { - LOG.error("Failed to send request " + payload, e); - } - } - if (result instanceof GroupMapUpdateException) { - throw (GroupMapUpdateException) result; - } - return result; - } - - void sendAsyncRequest(AsyncMapRequest asyncRequest, Member member, - Serializable payload) { - MapRequest request = new MapRequest(); - String id = this.idGenerator.generateId(); - asyncRequest.add(id, request); - synchronized (this.requests) { - this.requests.put(id, request); - } - try { - ObjectMessage objMsg = this.session.createObjectMessage(payload); - objMsg.setJMSReplyTo(this.inboxTopic); - objMsg.setJMSCorrelationID(id); - this.producer.send(member.getInBoxDestination(), objMsg); - } catch (JMSException e) { - if (this.started.get()) { - LOG.error("Failed to send async request " + payload, e); - } - } - } - - void sendReply(Object reply, Destination replyTo, String id) { - try { - ObjectMessage replyMsg = this.session - .createObjectMessage((Serializable) reply); - replyMsg.setJMSCorrelationID(id); - this.producer.send(replyTo, replyMsg); - } catch (JMSException e) { - LOG.error("Couldn't send reply from co-ordinator", e); - } - } - - void broadcastMapUpdate(EntryMessage entry, String correlationId) { - try { - EntryMessage copy = entry.copy(); - copy.setMapUpdate(true); - ObjectMessage objMsg = this.session.createObjectMessage(copy); - objMsg.setJMSCorrelationID(correlationId); - this.producer.send(this.topic, objMsg); - } catch (JMSException e) { - if (this.started.get()) { - LOG.error("Failed to send EntryMessage " + entry, e); - } - } - } - - void processMessage(Message message) { - if (message instanceof ObjectMessage) { - ObjectMessage objMsg = (ObjectMessage) message; - try { - String id = objMsg.getJMSCorrelationID(); - Destination replyTo = objMsg.getJMSReplyTo(); - Object payload = objMsg.getObject(); - if (payload instanceof Member) { - handleHeartbeats((Member) payload); - } else if (payload instanceof EntryMessage) { - EntryMessage entryMsg = (EntryMessage) payload; - entryMsg = entryMsg.copy(); - if (entryMsg.isMapUpdate()) { - processMapUpdate(entryMsg); - } else { - processEntryMessage(entryMsg, replyTo, id); - } - } else if (payload instanceof ElectionMessage) { - ElectionMessage electionMsg = (ElectionMessage) payload; - electionMsg = electionMsg.copy(); - processElectionMessage(electionMsg, replyTo, id); - } - if (id != null) { - MapRequest result = null; - synchronized (this.requests) { - result = this.requests.remove(id); - } - if (result != null) { - result.put(id, objMsg.getObject()); - } - } - } catch (JMSException e) { - LOG.warn("Failed to process reply: " + message, e); - } - } - } - - void processEntryMessage(EntryMessage entryMsg, Destination replyTo, - String correlationId) { - if (isCoordinator()) { - EntryKey key = entryMsg.getKey(); - EntryValue value = new EntryValue(key.getOwner(), - (V) entryMsg.getValue()); - boolean insert = entryMsg.isInsert(); - boolean containsKey = false; - synchronized (this.mapMutex) { - containsKey = this.localMap.containsKey(key); - } - if (containsKey) { - Member owner = getOwner((K) key.getKey()); - if (owner.equals(key.getOwner()) || key.isShare()) { - EntryValue old = null; - if (insert) { - synchronized (this.mapMutex) { - old = this.localMap.put(key, value); - } - } else { - synchronized (this.mapMutex) { - old = this.localMap.remove(key); - } - } - broadcastMapUpdate(entryMsg, correlationId); - fireMapChanged(owner, key.getKey(), old.getValue(), value - .getValue(), false); - } else { - Serializable reply = new GroupMapUpdateException( - "Owned by " + owner); - sendReply(reply, replyTo, correlationId); - } - } else { - if (insert) { - synchronized (this.mapMutex) { - this.localMap.put(key, value); - } - broadcastMapUpdate(entryMsg, correlationId); - fireMapChanged(key.getOwner(), key.getKey(), null, value - .getValue(), false); - } else { - sendReply(null, replyTo, correlationId); - } - } - } - } - - void processMapUpdate(EntryMessage entryMsg) { - boolean containsKey = false; - EntryKey key = entryMsg.getKey(); - EntryValue value = new EntryValue(key.getOwner(), (V) entryMsg - .getValue()); - boolean insert = entryMsg.isInsert()||entryMsg.isSync(); - synchronized (this.mapMutex) { - containsKey = this.localMap.containsKey(key); - } - - if (!isCoordinator()||entryMsg.isSync()) { - if (containsKey) { - Member owner = getOwner((K) key.getKey()); - EntryValue old = null; - if (insert) { - synchronized (this.mapMutex) { - old = this.localMap.put(key, value); - } - } else { - synchronized (this.mapMutex) { - old = this.localMap.remove(key); - value.setValue(null); - } - } - fireMapChanged(owner, key.getKey(), old.getValue(), value - .getValue(), entryMsg.isExpired()); - } else { - if (insert) { - synchronized (this.mapMutex) { - this.localMap.put(key, value); - } - fireMapChanged(key.getOwner(), key.getKey(), null, value - .getValue(), false); - } - } - } - } - - void handleHeartbeats(Message message) { - try { - if (message instanceof ObjectMessage) { - ObjectMessage objMsg = (ObjectMessage) message; - Member member = (Member) objMsg.getObject(); - handleHeartbeats(member); - } else { - LOG.warn("Unexpected message: " + message); - } - } catch (JMSException e) { - LOG.warn("Failed to handle heart beat", e); - } - } - - void handleHeartbeats(Member member) { - member.setTimeStamp(System.currentTimeMillis()); - if (this.members.put(member.getId(), member) == null) { - election(member, true); - fireMemberStarted(member); - if (!member.equals(this.local)) { - sendHeartBeat(member.getInBoxDestination()); - } - synchronized (this.memberMutex) { - this.memberMutex.notifyAll(); - } - } - } - - void handleConsumerEvents(ConsumerEvent event) { - if (!event.isStarted()) { - Member member = this.members.remove(event.getConsumerId() - .toString()); - if (member != null) { - fireMemberStopped(member); - election(member, false); - } - } - } - - void checkMembership() { - if (this.started.get() && this.electionFinished.get()) { - long checkTime = System.currentTimeMillis() - - getHeartBeatInterval(); - boolean doElection = false; - for (Member member : this.members.values()) { - if (member.getTimeStamp() < checkTime) { - LOG.info("Member timestamp expired " + member); - this.members.remove(member.getId()); - fireMemberStopped(member); - doElection = true; - } - } - if (doElection) { - election(null, false); - } - } - } - - void expirationSweep() { - if (isCoordinator() && this.started.get() && this.electionFinished.get()) { - List list = null; - synchronized (this.mapMutex) { - Map, EntryValue> map = this.localMap; - if (map != null) { - long currentTime = System.currentTimeMillis(); - for (EntryKey k : map.keySet()) { - if (k.isExpired(currentTime)) { - if (list == null) { - list = new ArrayList(); - list.add(k); - } - } - } - } - } - //do the actual removal of entries in a separate thread - if (list != null) { - final List expire = list; - this.executor.execute(new Runnable() { - public void run() { - doExpiration(expire); - } - }); - } - } - - } - - void doExpiration(List list) { - if (this.started.get() && this.electionFinished.get() - && isCoordinator()) { - for (EntryKey k : list) { - EntryValue old = null; - synchronized (this.mapMutex) { - old = this.localMap.remove(k); - } - if (old != null) { - EntryMessage entryMsg = new EntryMessage(); - entryMsg.setType(EntryMessage.MessageType.DELETE); - entryMsg.setExpired(true); - entryMsg.setKey(k); - entryMsg.setValue(old.getValue()); - broadcastMapUpdate(entryMsg, ""); - fireMapChanged(k.getOwner(), k.getKey(), old.getValue(), - null, true); - } - } - } - } - - void sendHeartBeat() { - sendHeartBeat(this.heartBeatTopic); - } - - void sendHeartBeat(Destination destination) { - if (this.started.get()) { - try { - ObjectMessage msg = this.session - .createObjectMessage(this.local); - this.producer.send(destination, msg); - } catch (javax.jms.IllegalStateException e) { - // ignore - as we are probably stopping - } catch (Throwable e) { - if (this.started.get()) { - LOG.warn("Failed to send heart beat", e); - } - } - } - } - - void updateNewMemberMap(Member member) { - List, EntryValue>> list = new ArrayList, EntryValue>>(); - synchronized (this.mapMutex) { - if (this.localMap != null) { - for (Map.Entry, EntryValue> entry : this.localMap - .entrySet()) { - list.add(entry); - } - } - } - try { - for (Map.Entry, EntryValue> entry : list) { - EntryMessage entryMsg = new EntryMessage(); - entryMsg.setKey(entry.getKey()); - entryMsg.setValue(entry.getValue().getValue()); - entryMsg.setType(EntryMessage.MessageType.SYNC); - entryMsg.setMapUpdate(true); - ObjectMessage objMsg = this.session - .createObjectMessage(entryMsg); - if (!member.equals(entry.getKey().getOwner())) { - this.producer.send(member.getInBoxDestination(), objMsg); - } - } - } catch (JMSException e) { - if (started.get()) { - LOG.warn("Failed to update new member ", e); - } - } - } - - void fireMemberStarted(Member member) { - LOG.info(this.local.getName() + " Member started " + member); - for (MemberChangedListener l : this.membershipListeners) { - l.memberStarted(member); - } - } - - void fireMemberStopped(Member member) { - LOG.info(this.local.getName() + " Member stopped " + member); - for (MemberChangedListener l : this.membershipListeners) { - l.memberStopped(member); - } - // remove all entries owned by the stopped member - List> tmpList = new ArrayList>(); - boolean mapExists = false; - synchronized (this.mapMutex) { - mapExists = this.localMap != null; - if (mapExists) { - for (EntryKey entryKey : this.localMap.keySet()) { - if (entryKey.getOwner().equals(member)) { - if (entryKey.isRemoveOnExit()) { - tmpList.add(entryKey); - } - } - } - } - } - if (mapExists) { - for (EntryKey entryKey : tmpList) { - EntryValue value = null; - synchronized (this.mapMutex) { - value = this.localMap.remove(entryKey); - } - fireMapChanged(member, entryKey.getKey(), value.getValue(), - null,false); - } - } - } - - void fireMapChanged(final Member owner, final Object key, - final Object oldValue, final Object newValue, final boolean expired) { - if (this.started.get() && this.executor != null - && !this.executor.isShutdown()) { - this.executor.execute(new Runnable() { - public void run() { - doFireMapChanged(owner, key, oldValue, newValue, expired); - } - }); - } - } - - void doFireMapChanged(Member owner, Object key, Object oldValue, - Object newValue, boolean expired) { - for (MapChangedListener l : this.mapChangedListeners) { - if (oldValue == null) { - l.mapInsert(owner, key, newValue); - } else if (newValue == null) { - l.mapRemove(owner, key, oldValue, expired); - } else { - l.mapUpdate(owner, key, oldValue, newValue); - } - } - } - - void election(final Member member, final boolean memberStarted) { - if (this.started.get() && this.executor != null - && !this.executor.isShutdown()) { - this.executor.execute(new Runnable() { - public void run() { - doElection(member, memberStarted); - } - }); - } - } - - void doElection(Member member, boolean memberStarted) { - if ((member == null || !member.equals(this.local)) - && this.electionFinished.compareAndSet(true, false)) { - boolean wasCoordinator = isCoordinator() && !isEmpty(); - // call an election - while (!callElection()) - ; - List members = new ArrayList(this.members.values()); - this.coordinator = selectCordinator(members); - if (isCoordinator()) { - broadcastElectionType(ElectionMessage.MessageType.COORDINATOR); - } - if (memberStarted && member != null) { - if (wasCoordinator || isCoordinator() && this.started.get()) { - updateNewMemberMap(member); - } - } - if (!this.electionFinished.get()) { - try { - synchronized (this.electionFinished) { - this.electionFinished.wait(this.heartBeatInterval * 2); - } - } catch (InterruptedException e) { - } - } - if (!this.electionFinished.get()) { - // we must be the coordinator - this.coordinator = this.local; - this.electionFinished.set(true); - broadcastElectionType(ElectionMessage.MessageType.COORDINATOR); - } - } - } - - boolean callElection() { - List members = new ArrayList(this.members.values()); - AsyncMapRequest request = new AsyncMapRequest(); - for (Member member : members) { - if (this.local.getId().compareTo(member.getId()) < 0) { - ElectionMessage msg = new ElectionMessage(); - msg.setMember(this.local); - msg.setType(ElectionMessage.MessageType.ELECTION); - sendAsyncRequest(request, member, msg); - } - } - return request.isSuccess(getHeartBeatInterval()); - } - - void processElectionMessage(ElectionMessage msg, Destination replyTo, - String correlationId) { - if (msg.isElection()) { - msg.setType(ElectionMessage.MessageType.ANSWER); - msg.setMember(this.local); - sendReply(msg, replyTo, correlationId); - } else if (msg.isCoordinator()) { - synchronized (this.electionFinished) { - this.coordinator = msg.getMember(); - this.electionFinished.set(true); - this.electionFinished.notifyAll(); - } - } - } - - void broadcastElectionType(ElectionMessage.MessageType type) { - if (started.get()) { - try { - ElectionMessage msg = new ElectionMessage(); - msg.setMember(this.local); - msg.setType(type); - ObjectMessage objMsg = this.session.createObjectMessage(msg); - this.producer.send(this.topic, objMsg); - } catch (javax.jms.IllegalStateException e) { - // ignore - we are stopping - } catch (JMSException e) { - if (this.started.get()) { - LOG.error("Failed to broadcast election message: " + type, - e); - } - } - } - } - - void checkStatus() throws IllegalStateException { - if (!started.get()) { - throw new IllegalStateException("GroupMap " + this.local.getName() - + " not started"); - } - waitForElection(); - } - - void waitForElection() { - synchronized (this.electionFinished) { - while (started.get() && !this.electionFinished.get()) { - try { - this.electionFinished.wait(1000); - } catch (InterruptedException e) { - stop(); - Thread.currentThread().interrupt(); - } - } - } - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java b/activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java new file mode 100644 index 0000000000..b3fb64bec1 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.group; + + +/** + * A listener for message communication + * + */ +public interface GroupMessageListener { + + /** + * Called when a message is delivered to the Group from another member + * @param sender the member who sent the message + * @param replyId the id to use to respond to a message + * @param message the message object + */ + void messageDelivered(Member sender, String replyId, Object message); +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/group/Member.java b/activemq-core/src/main/java/org/apache/activemq/group/Member.java index f5905b0a92..b0b2b68031 100644 --- a/activemq-core/src/main/java/org/apache/activemq/group/Member.java +++ b/activemq-core/src/main/java/org/apache/activemq/group/Member.java @@ -22,6 +22,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import javax.jms.Destination; import org.apache.activemq.util.IdGenerator; +import com.sun.jndi.url.corbaname.corbanameURLContextFactory; /** *

@@ -32,9 +33,10 @@ public class Member implements Externalizable { private String name; private String id; private String hostname; - private long timeStamp; private long startTime; + private int coordinatorWeight; private Destination inBoxDestination; + private transient long timeStamp; /** @@ -94,12 +96,41 @@ public class Member implements Externalizable { this.inBoxDestination=dest; } + /** + * @return the timeStamp + */ + long getTimeStamp() { + return this.timeStamp; + } + + /** + * @param timeStamp the timeStamp to set + */ + void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + /** + * @return the coordinatorWeight + */ + public int getCoordinatorWeight() { + return this.coordinatorWeight; + } + /** + * @param coordinatorWeight the coordinatorWeight to set + */ + public void setCoordinatorWeight(int coordinatorWeight) { + this.coordinatorWeight = coordinatorWeight; + } + + + public String toString() { return this.name+"["+this.id+"]@"+this.hostname; } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + this.coordinatorWeight=in.readInt();; this.name = in.readUTF(); this.id = in.readUTF(); this.hostname = in.readUTF(); @@ -108,6 +139,7 @@ public class Member implements Externalizable { } public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(this.coordinatorWeight); out.writeUTF(this.name != null ? this.name : ""); out.writeUTF(this.id != null ? this.id : ""); out.writeUTF(this.hostname != null ? this.hostname : ""); @@ -127,20 +159,4 @@ public class Member implements Externalizable { } return result; } - - /** - * @return the timeStamp - */ - long getTimeStamp() { - return this.timeStamp; - } - - /** - * @param timeStamp the timeStamp to set - */ - void setTimeStamp(long timeStamp) { - this.timeStamp = timeStamp; - } - - } diff --git a/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java b/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java index 3a59ac56a4..a5c9ee6385 100644 --- a/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java +++ b/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java @@ -18,7 +18,7 @@ package org.apache.activemq.group; /** - * @author rajdavies + * A listener for membership changes to a group * */ public interface MemberChangedListener { diff --git a/activemq-core/src/main/java/org/apache/activemq/group/package.html b/activemq-core/src/main/java/org/apache/activemq/group/package.html index ac31fc98b2..d4af9c3fd1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/group/package.html +++ b/activemq-core/src/main/java/org/apache/activemq/group/package.html @@ -19,7 +19,7 @@ -Shared state and membership information between members of a remote group +Shared state, messaging and membership information between members of a distributed group diff --git a/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java b/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java index 40e1779bb8..74be0ae846 100644 --- a/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java @@ -34,12 +34,12 @@ public class GroupMapTest extends TestCase { /** * Test method for - * {@link org.apache.activemq.group.GroupMap#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}. + * {@link org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}. * @throws Exception */ public void testAddMemberChangedListener() throws Exception { final AtomicInteger counter = new AtomicInteger(); - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); map1.addMemberChangedListener(new MemberChangedListener(){ public void memberStarted(Member member) { @@ -65,7 +65,7 @@ public class GroupMapTest extends TestCase { } } assertEquals(1, counter.get()); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.start(); synchronized(counter) { if (counter.get()<2) { @@ -76,7 +76,7 @@ public class GroupMapTest extends TestCase { map2.stop(); synchronized(counter) { if (counter.get()>=2) { - counter.wait(GroupMap.DEFAULT_HEART_BEAT_INTERVAL*3); + counter.wait(Group.DEFAULT_HEART_BEAT_INTERVAL*3); } } assertEquals(1, counter.get()); @@ -85,14 +85,14 @@ public class GroupMapTest extends TestCase { /** * Test method for - * {@link org.apache.activemq.group.GroupMap#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}. + * {@link org.apache.activemq.group.Group#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}. * @throws Exception */ public void testAddMapChangedListener() throws Exception { final AtomicBoolean called1 = new AtomicBoolean(); final AtomicBoolean called2 = new AtomicBoolean(); - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -104,7 +104,7 @@ public class GroupMapTest extends TestCase { }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -133,12 +133,13 @@ public class GroupMapTest extends TestCase { map1.stop(); map2.stop(); } - - public void testGetWriteLock() throws Exception { - GroupMap map1 = new GroupMap(connection1, "map1"); + + public void testGetImplicitWriteLock() throws Exception { + Group map1 = new Group(connection1, "map1"); final AtomicBoolean called = new AtomicBoolean(); map1.start(); - GroupMap map2 = new GroupMap(connection2, "map2"); + Group map2 = new Group(connection2, "map2"); + map2.setAlwaysLock(true); map2.setMinimumGroupSize(2); map2.start(); map2.put("test", "foo"); @@ -150,15 +151,84 @@ public class GroupMapTest extends TestCase { map1.stop(); map2.stop(); } - + + public void testExpireImplicitWriteLock() throws Exception { + Group map1 = new Group(connection1, "map1"); + final AtomicBoolean called = new AtomicBoolean(); + map1.start(); + Group map2 = new Group(connection2, "map2"); + map2.setAlwaysLock(true); + map2.setLockTimeToLive(1000); + map2.setMinimumGroupSize(2); + map2.start(); + map2.put("test", "foo"); + try { + map1.put("test", "bah"); + fail("Should have thrown an exception!"); + } catch (GroupMapUpdateException e) { + } + Thread.sleep(2000); + map1.put("test", "bah"); + map1.stop(); + map2.stop(); + } + + public void testExpireImplicitLockOnExit() throws Exception { + Group map1 = new Group(connection1, "map1"); + final AtomicBoolean called = new AtomicBoolean(); + map1.start(); + Group map2 = new Group(connection2, "map2"); + map2.setAlwaysLock(true); + map2.setMinimumGroupSize(2); + map2.start(); + map2.put("test", "foo"); + try { + map1.put("test", "bah"); + fail("Should have thrown an exception!"); + } catch (GroupMapUpdateException e) { + } + map2.stop(); + map1.put("test", "bah"); + map1.stop(); + + } + + public void testGetExplicitWriteLock() throws Exception { + Group map1 = new Group(connection1, "map1"); + map1.setAlwaysLock(true); + final AtomicBoolean called = new AtomicBoolean(); + map1.start(); + Group map2 = new Group(connection2, "map2"); + map2.setAlwaysLock(true); + map2.setMinimumGroupSize(2); + map2.start(); + map2.put("test", "foo"); + map2.lock("test"); + try { + map1.put("test", "bah"); + fail("Should have thrown an exception!"); + } catch (GroupMapUpdateException e) { + } + map2.unlock("test"); + map1.lock("test"); + try { + map2.lock("test"); + fail("Should have thrown an exception!"); + } catch (GroupMapUpdateException e) { + } + map1.stop(); + map2.stop(); + } + + /** - * Test method for {@link org.apache.activemq.group.GroupMap#clear()}. + * Test method for {@link org.apache.activemq.group.Group#clear()}. * * @throws Exception */ public void testClear() throws Exception { - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -176,7 +246,7 @@ public class GroupMapTest extends TestCase { } }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -202,12 +272,12 @@ public class GroupMapTest extends TestCase { * Test a new map is populated for existing values */ public void testMapUpdatedOnStart() throws Exception { - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.start(); map1.put("test", "foo"); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { synchronized(called) { @@ -230,9 +300,9 @@ public class GroupMapTest extends TestCase { map1.stop(); map2.stop(); } - + public void testContainsKey() throws Exception { - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -243,7 +313,7 @@ public class GroupMapTest extends TestCase { } }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -261,11 +331,11 @@ public class GroupMapTest extends TestCase { /** * Test method for - * {@link org.apache.activemq.group.GroupMap#containsValue(java.lang.Object)}. + * {@link org.apache.activemq.group.Group#containsValue(java.lang.Object)}. * @throws Exception */ public void testContainsValue() throws Exception { - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -276,7 +346,7 @@ public class GroupMapTest extends TestCase { } }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -299,11 +369,11 @@ public class GroupMapTest extends TestCase { /** * Test method for - * {@link org.apache.activemq.group.GroupMap#get(java.lang.Object)}. + * {@link org.apache.activemq.group.Group#get(java.lang.Object)}. * @throws Exception */ public void testGet() throws Exception { - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -314,7 +384,7 @@ public class GroupMapTest extends TestCase { } }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -327,15 +397,29 @@ public class GroupMapTest extends TestCase { map1.stop(); map2.stop(); } + + public void testPut() throws Exception { + Group map1 = new Group(connection1,"map1"); + map1.start(); + Group map2 = new Group(connection2,"map2"); + map2.setMinimumGroupSize(2); + map2.start(); + Object value = map1.put("foo", "blob"); + assertNull(value); + value = map1.put("foo", "blah"); + assertEquals(value, "blob"); + map1.stop(); + map2.stop(); + } /** * Test method for - * {@link org.apache.activemq.group.GroupMap#remove(java.lang.Object)}. + * {@link org.apache.activemq.group.Group#remove(java.lang.Object)}. */ public void testRemove() throws Exception{ - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -353,7 +437,7 @@ public class GroupMapTest extends TestCase { } }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -380,7 +464,7 @@ public class GroupMapTest extends TestCase { final AtomicBoolean called1 = new AtomicBoolean(); final AtomicBoolean called2 = new AtomicBoolean(); - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); map1.setTimeToLive(1000); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapRemove(Member owner, Object key, Object value,boolean expired) { @@ -392,7 +476,7 @@ public class GroupMapTest extends TestCase { }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.addMapChangedListener(new DefaultMapChangedListener() { public void mapRemove(Member owner, Object key, Object value,boolean expired) { diff --git a/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java b/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java similarity index 57% rename from activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java rename to activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java index 8c453bb51d..ce42e9500c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java @@ -26,27 +26,45 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; -public class GroupMapMemberTest extends TestCase { +public class GroupMemberTest extends TestCase { protected BrokerService broker; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; + + public void testCoordinatorSelection() throws Exception{ + Group group = new Group(null,""); + Listlist = new ArrayList(); + final int number =10; + Member choosen = null; + for (int i =0;i< number;i++) { + Member m = new Member("group"+i); + m.setId(""+i); + if (number/2==i) { + m.setCoordinatorWeight(10); + choosen=m; + } + list.add(m); + } + Member c = group.selectCordinator(list); + assertEquals(c,choosen); + } /** * Test method for - * {@link org.apache.activemq.group.GroupMap#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}. + * {@link org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}. * @throws Exception */ public void testGroup() throws Exception { - int number = 20; + final int number = 10; Listconnections = new ArrayList(); - ListgroupMaps = new ArrayList(); + ListgroupMaps = new ArrayList(); ConnectionFactory factory = createConnectionFactory(); for (int i =0; i < number; i++) { Connection connection = factory.createConnection(); connection.start(); connections.add(connection); - GroupMap map = new GroupMap(connection,"map"+i); - map.setHeartBeatInterval(20000); + Group map = new Group(connection,"map"+i); + map.setHeartBeatInterval(200); if(i ==number-1) { map.setMinimumGroupSize(number); } @@ -54,20 +72,56 @@ public class GroupMapMemberTest extends TestCase { groupMaps.add(map); } + int coordinatorNumber = 0; + for (Group map:groupMaps) { + if (map.isCoordinator()) { + coordinatorNumber++; + } + } + for(Group map:groupMaps) { + map.stop(); + } + for (Connection connection:connections) { + connection.stop(); + } + + } + +public void XtestWeightedGroup() throws Exception { + + final int number = 10; + Listconnections = new ArrayList(); + ListgroupMaps = new ArrayList(); + Group last = null; + ConnectionFactory factory = createConnectionFactory(); + for (int i =0; i < number; i++) { + Connection connection = factory.createConnection(); + connection.start(); + connections.add(connection); + Group map = new Group(connection,"map"+i); + map.setHeartBeatInterval(200); + if(i ==number-1) { + map.setMinimumGroupSize(number); + map.setCoordinatorWeight(10); + last=map; + } + map.start(); + groupMaps.add(map); + } + int coordinator = 0; - for (GroupMap map:groupMaps) { + Group groupCoordinator = null; + for (Group map:groupMaps) { if (map.isCoordinator()) { coordinator++; + groupCoordinator=map; } } + assertNotNull(groupCoordinator); + assertEquals(groupCoordinator, last); assertEquals(1,coordinator); - groupMaps.get(0).put("key", "value"); - Thread.sleep(2000); - for (GroupMap map:groupMaps) { - assertTrue(map.get("key").equals("value")); - } - for(GroupMap map:groupMaps) { + for(Group map:groupMaps) { map.stop(); } for (Connection connection:connections) { diff --git a/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java b/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java new file mode 100644 index 0000000000..fa74b0d03c --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; + +public class GroupMessageTest extends TestCase { + protected BrokerService broker; + protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; + + public void testGroupBroadcast() throws Exception { + final int number = 10; + final AtomicInteger count = new AtomicInteger(); + List connections = new ArrayList(); + List groups = new ArrayList(); + ConnectionFactory factory = createConnectionFactory(); + for (int i = 0; i < number; i++) { + Connection connection = factory.createConnection(); + connection.start(); + connections.add(connection); + Group group = new Group(connection, "group" + i); + group.setHeartBeatInterval(20000); + if (i == number - 1) { + group.setMinimumGroupSize(number); + } + group.start(); + groups.add(group); + group.addGroupMessageListener(new GroupMessageListener() { + public void messageDelivered(Member sender, String replyId, + Object message) { + synchronized (count) { + if (count.incrementAndGet() == number) { + count.notifyAll(); + } + } + } + }); + } + groups.get(0).broadcastMessage("hello"); + synchronized (count) { + if (count.get() < number) { + count.wait(5000); + } + } + assertEquals(number, count.get()); + for (Group map : groups) { + map.stop(); + } + for (Connection connection : connections) { + connection.stop(); + } + } + + public void testsendMessage() throws Exception { + final int number = 10; + final AtomicInteger count = new AtomicInteger(); + List connections = new ArrayList(); + List groups = new ArrayList(); + ConnectionFactory factory = createConnectionFactory(); + for (int i = 0; i < number; i++) { + Connection connection = factory.createConnection(); + connection.start(); + connections.add(connection); + Group group = new Group(connection, "group" + i); + group.setHeartBeatInterval(20000); + if (i == number - 1) { + group.setMinimumGroupSize(number); + } + group.start(); + groups.add(group); + group.addGroupMessageListener(new GroupMessageListener() { + public void messageDelivered(Member sender, String replyId, + Object message) { + synchronized (count) { + count.incrementAndGet(); + count.notifyAll(); + } + } + }); + } + groups.get(0).sendMessage("hello"); + synchronized (count) { + if (count.get() == 0) { + count.wait(5000); + } + } + // wait a while to check that only one got it + Thread.sleep(2000); + assertEquals(1, count.get()); + for (Group map : groups) { + map.stop(); + } + for (Connection connection : connections) { + connection.stop(); + } + } + + public void testSendToSingleMember() throws Exception { + ConnectionFactory factory = createConnectionFactory(); + Connection connection1 = factory.createConnection(); + Connection connection2 = factory.createConnection(); + connection1.start(); + connection2.start(); + Group group1 = new Group(connection1, "group1"); + final AtomicBoolean called = new AtomicBoolean(); + group1.addGroupMessageListener(new GroupMessageListener() { + public void messageDelivered(Member sender, String replyId, + Object message) { + synchronized (called) { + called.set(true); + called.notifyAll(); + } + } + }); + group1.start(); + Group group2 = new Group(connection2, "group2"); + group2.setMinimumGroupSize(2); + group2.start(); + Member member1 = group2.getMemberByName("group1"); + group2.sendMessage(member1, "hello"); + synchronized (called) { + if (!called.get()) { + called.wait(5000); + } + } + assertTrue(called.get()); + group1.stop(); + group2.stop(); + connection1.close(); + connection2.close(); + } + + public void testSendRequestReply() throws Exception { + ConnectionFactory factory = createConnectionFactory(); + Connection connection1 = factory.createConnection(); + Connection connection2 = factory.createConnection(); + connection1.start(); + connection2.start(); + final int number = 1000; + final AtomicInteger requestCount = new AtomicInteger(); + final AtomicInteger replyCount = new AtomicInteger(); + final List requests = new ArrayList(); + final List replies = new ArrayList(); + for (int i = 0; i < number; i++) { + requests.add("request" + i); + replies.add("reply" + i); + } + final Group group1 = new Group(connection1, "group1"); + final AtomicBoolean finished = new AtomicBoolean(); + group1.addGroupMessageListener(new GroupMessageListener() { + public void messageDelivered(Member sender, String replyId, + Object message) { + if (!replies.isEmpty()) { + String reply = replies.remove(0); + try { + group1.sendMessageResponse(sender, replyId, reply); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + }); + group1.start(); + final Group group2 = new Group(connection2, "group2"); + group2.setMinimumGroupSize(2); + group2.addGroupMessageListener(new GroupMessageListener() { + public void messageDelivered(Member sender, String replyId, + Object message) { + if (!requests.isEmpty()) { + String request = requests.remove(0); + try { + group2.sendMessage(sender, request); + } catch (JMSException e) { + e.printStackTrace(); + } + }else { + synchronized (finished) { + finished.set(true); + finished.notifyAll(); + } + } + } + }); + group2.start(); + Member member1 = group2.getMemberByName("group1"); + group2.sendMessage(member1, requests.remove(0)); + synchronized (finished) { + if (!finished.get()) { + finished.wait(10000); + } + } + assertTrue(finished.get()); + group1.stop(); + group2.stop(); + connection1.close(); + connection2.close(); + } + + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + } + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() + throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( + ActiveMQConnection.DEFAULT_BROKER_URL); + return cf; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + configureBroker(answer); + answer.start(); + return answer; + } + + protected void configureBroker(BrokerService answer) throws Exception { + answer.setPersistent(false); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } +}