tidied up a little

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@801515 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2009-08-06 06:15:52 +00:00
parent 5a80d5a6c0
commit 72a047f338
1 changed files with 133 additions and 242 deletions

View File

@ -66,38 +66,31 @@ import org.apache.commons.logging.LogFactory;
/**
* <P>
* A <CODE>Group</CODE> is a distributed collaboration implementation that is
* used to shared state and process messages amongst a distributed group of
* other <CODE>Group</CODE> instances. Membership of a group is handled
* A <CODE>Group</CODE> is a distributed collaboration implementation that is used to shared state and process
* messages amongst a distributed group of other <CODE>Group</CODE> instances. Membership of a group is handled
* automatically using discovery.
* <P>
* The underlying transport is JMS and there are some optimizations that occur
* for membership if used with ActiveMQ - but <CODE>Group</CODE> can be used
* with any JMS implementation.
* The underlying transport is JMS and there are some optimizations that occur for membership if used with ActiveMQ -
* but <CODE>Group</CODE> can be used with any JMS implementation.
*
* <P>
* Updates to the group shared map are controlled by a coordinator. The
* coordinator is elected by the member with the lowest lexicographical id -
* based on the bully algorithm [Silberschatz et al. 1993]
* Updates to the group shared map are controlled by a coordinator. The coordinator is elected by the member with the
* lowest lexicographical id - based on the bully algorithm [Silberschatz et al. 1993]
* <P>
* The {@link #selectCordinator(Collection<Member> members)} method may be
* overridden to implement a custom mechanism for choosing how the coordinator
* is elected for the map.
* The {@link #selectCordinator(Collection<Member> members)} method may be overridden to implement a custom mechanism
* for choosing how the coordinator is elected for the map.
* <P>
* New <CODE>Group</CODE> instances have their state updated by the
* coordinator, and coordinator failure is handled automatically within the
* group.
* New <CODE>Group</CODE> instances have their state updated by the coordinator, and coordinator failure is handled
* automatically within the group.
* <P>
* All map updates are totally ordered through the coordinator, whilst read
* operations happen locally.
* All map updates are totally ordered through the coordinator, whilst read operations happen locally.
* <P>
* A <CODE>Group</CODE> supports the concept of owner only updates(write
* locks), shared updates, entry expiration times and removal on owner exit -
* all of which are optional. In addition, you can grab and release locks for
* values in the map, independently of who created them.
* A <CODE>Group</CODE> supports the concept of owner only updates(write locks), shared updates, entry expiration
* times and removal on owner exit - all of which are optional. In addition, you can grab and release locks for values
* in the map, independently of who created them.
* <P>
* In addition, members of a group can broadcast messages and implement
* request/response with other <CODE>Group</CODE> instances.
* In addition, members of a group can broadcast messages and implement request/response with other <CODE>Group</CODE>
* instances.
*
* <P>
*
@ -114,10 +107,8 @@ public class Group<K, V> implements Map<K, V>, Service {
public static final long DEFAULT_HEART_BEAT_INTERVAL = 1000;
private static final long EXPIRATION_SWEEP_INTERVAL = 500;
private static final Log LOG = LogFactory.getLog(Group.class);
private static final String STATE_PREFIX = "STATE." + Group.class.getName()
+ ".";
private static final String GROUP_MESSAGE_PREFIX = "MESSAGE."
+ Group.class.getName() + ".";
private static final String STATE_PREFIX = "STATE." + Group.class.getName() + ".";
private static final String GROUP_MESSAGE_PREFIX = "MESSAGE." + Group.class.getName() + ".";
private static final String STATE_TYPE = "state";
private static final String MESSAGE_TYPE = "message";
private static final String MEMBER_ID_PROPERTY = "memberId";
@ -184,8 +175,7 @@ public class Group<K, V> implements Map<K, V>, Service {
}
/**
* Set the local map implementation to be used By default its a HashMap -
* but you could use a Cache for example
* Set the local map implementation to be used By default its a HashMap - but you could use a Cache for example
*
* @param map
*/
@ -209,31 +199,22 @@ public class Group<K, V> implements Map<K, V>, Service {
}
}
this.connection.start();
this.stateSession = this.connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
this.messageSession = this.connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
this.stateSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.messageSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.stateProducer = this.stateSession.createProducer(null);
this.stateProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
this.inboxTopic = this.stateSession.createTemporaryTopic();
String stateTopicName = STATE_PREFIX + this.groupName;
this.stateTopic = this.stateSession.createTopic(stateTopicName);
this.heartBeatTopic = this.stateSession.createTopic(stateTopicName
+ ".heartbeat");
String messageDestinationName = GROUP_MESSAGE_PREFIX
+ this.groupName;
this.messageTopic = this.messageSession
.createTopic(messageDestinationName);
this.messageQueue = this.messageSession
.createQueue(messageDestinationName);
MessageConsumer privateInbox = this.messageSession
.createConsumer(this.inboxTopic);
MessageConsumer memberChangeConsumer = this.stateSession
.createConsumer(this.stateTopic);
this.heartBeatTopic = this.stateSession.createTopic(stateTopicName + ".heartbeat");
String messageDestinationName = GROUP_MESSAGE_PREFIX + this.groupName;
this.messageTopic = this.messageSession.createTopic(messageDestinationName);
this.messageQueue = this.messageSession.createQueue(messageDestinationName);
MessageConsumer privateInbox = this.messageSession.createConsumer(this.inboxTopic);
MessageConsumer memberChangeConsumer = this.stateSession.createConsumer(this.stateTopic);
String memberId = null;
if (memberChangeConsumer instanceof ActiveMQMessageConsumer) {
memberId = ((ActiveMQMessageConsumer) memberChangeConsumer)
.getConsumerId().toString();
memberId = ((ActiveMQMessageConsumer) memberChangeConsumer).getConsumerId().toString();
} else {
memberId = this.idGenerator.generateId();
}
@ -252,63 +233,53 @@ public class Group<K, V> implements Map<K, V>, Service {
});
this.messageProducer = this.messageSession.createProducer(null);
this.messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageConsumer topicMessageConsumer = this.messageSession
.createConsumer(this.messageTopic);
MessageConsumer topicMessageConsumer = this.messageSession.createConsumer(this.messageTopic);
topicMessageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
processJMSMessage(message);
}
});
MessageConsumer queueMessageConsumer = this.messageSession
.createConsumer(this.messageQueue);
MessageConsumer queueMessageConsumer = this.messageSession.createConsumer(this.messageQueue);
queueMessageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
processJMSMessage(message);
}
});
MessageConsumer heartBeatConsumer = this.stateSession
.createConsumer(this.heartBeatTopic);
MessageConsumer heartBeatConsumer = this.stateSession.createConsumer(this.heartBeatTopic);
heartBeatConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
handleHeartbeats(message);
}
});
this.consumerEvents = new ConsumerEventSource(this.connection,
this.stateTopic);
this.consumerEvents = new ConsumerEventSource(this.connection, this.stateTopic);
this.consumerEvents.setConsumerListener(new ConsumerListener() {
public void onConsumerEvent(ConsumerEvent event) {
handleConsumerEvents(event);
}
});
this.consumerEvents.start();
this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Election{"
+ Group.this.local + "}");
thread.setDaemon(true);
return thread;
}
});
this.stateExecutor = Executors
.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Group State{"
+ Group.this.local + "}");
thread.setDaemon(true);
return thread;
}
});
this.messageExecutor = Executors
.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable,
"Group Messages{" + Group.this.local + "}");
Thread thread = new Thread(runnable, "Election{" + Group.this.local + "}");
thread.setDaemon(true);
return thread;
}
});
this.stateExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Group State{" + Group.this.local + "}");
thread.setDaemon(true);
return thread;
}
});
this.messageExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Group Messages{" + Group.this.local + "}");
thread.setDaemon(true);
return thread;
}
});
sendHeartBeat();
this.heartBeatTask = new SchedulerTimerTask(new Runnable() {
public void run() {
@ -326,19 +297,13 @@ public class Group<K, V> implements Map<K, V>, Service {
}
});
this.timer = new Timer("Distributed heart beat", true);
this.timer.scheduleAtFixedRate(this.heartBeatTask,
getHeartBeatInterval() / 3, getHeartBeatInterval() / 2);
this.timer.scheduleAtFixedRate(this.checkMembershipTask,
getHeartBeatInterval(), getHeartBeatInterval());
this.timer.scheduleAtFixedRate(this.expirationTask,
EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL);
this.timer.scheduleAtFixedRate(this.heartBeatTask, getHeartBeatInterval() / 3, getHeartBeatInterval() / 2);
this.timer.scheduleAtFixedRate(this.checkMembershipTask, getHeartBeatInterval(), getHeartBeatInterval());
this.timer.scheduleAtFixedRate(this.expirationTask, EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL);
// await for members to join
long timeout = (long) (this.heartBeatInterval
* this.minimumGroupSize *1.5);
long timeout = (long) (this.heartBeatInterval * this.minimumGroupSize * 1.5);
long deadline = System.currentTimeMillis() + timeout;
while ((this.members.size() < this.minimumGroupSize || !this.electionFinished
.get())
&& timeout > 0) {
while ((this.members.size() < this.minimumGroupSize || !this.electionFinished.get()) && timeout > 0) {
synchronized (this.electionFinished) {
this.electionFinished.wait(timeout);
}
@ -376,7 +341,6 @@ public class Group<K, V> implements Map<K, V>, Service {
} catch (Exception e) {
LOG.debug("Caught exception stopping", e);
}
}
}
@ -421,8 +385,7 @@ public class Group<K, V> implements Map<K, V>, Service {
/**
* @param alwaysLock -
* set true if objects inserted will always be locked (default is
* false)
* set true if objects inserted will always be locked (default is false)
*/
public void setAlwaysLock(boolean alwaysLock) {
this.alwaysLock = alwaysLock;
@ -520,8 +483,7 @@ public class Group<K, V> implements Map<K, V>, Service {
}
/**
* Sets the policy for owned objects in the group If set to true, when this
* <code>GroupMap<code> stops,
* Sets the policy for owned objects in the group If set to true, when this <code>GroupMap<code> stops,
* any objects it owns will be removed from the group map
* @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set
*/
@ -612,16 +574,14 @@ public class Group<K, V> implements Map<K, V>, Service {
public boolean containsKey(Object key) {
synchronized (this.mapMutex) {
return this.localMap != null ? this.localMap.containsKey(key)
: false;
return this.localMap != null ? this.localMap.containsKey(key) : false;
}
}
public boolean containsValue(Object value) {
EntryValue entryValue = new EntryValue(null, value);
synchronized (this.mapMutex) {
return this.localMap != null ? this.localMap
.containsValue(entryValue) : false;
return this.localMap != null ? this.localMap.containsValue(entryValue) : false;
}
}
@ -669,10 +629,9 @@ public class Group<K, V> implements Map<K, V>, Service {
* @throws IllegalStateException
*
*/
public V put(K key, V value) throws GroupUpdateException,
IllegalStateException {
return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),
isReleaseLockOnExit(), getTimeToLive(), getLockTimeToLive());
public V put(K key, V value) throws GroupUpdateException, IllegalStateException {
return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(), isReleaseLockOnExit(), getTimeToLive(),
getLockTimeToLive());
}
/**
@ -690,9 +649,8 @@ public class Group<K, V> implements Map<K, V>, Service {
* @throws IllegalStateException
*
*/
public V put(K key, V value, boolean lock, boolean removeOnExit,
boolean releaseLockOnExit, long timeToLive, long leaseTime)
throws GroupUpdateException, IllegalStateException {
public V put(K key, V value, boolean lock, boolean removeOnExit, boolean releaseLockOnExit, long timeToLive,
long leaseTime) throws GroupUpdateException, IllegalStateException {
checkStatus();
EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
entryKey.setLocked(lock);
@ -756,10 +714,9 @@ public class Group<K, V> implements Map<K, V>, Service {
* @throws GroupUpdateException
* @throws IllegalStateException
*/
public void putAll(Map<? extends K, ? extends V> t)
throws GroupUpdateException, IllegalStateException {
putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),
isReleaseLockOnExit(), getTimeToLive(), getLockTimeToLive());
public void putAll(Map<? extends K, ? extends V> t) throws GroupUpdateException, IllegalStateException {
putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(), isReleaseLockOnExit(), getTimeToLive(),
getLockTimeToLive());
}
/**
@ -774,13 +731,10 @@ public class Group<K, V> implements Map<K, V>, Service {
* @throws GroupUpdateException
* @throws IllegalStateException
*/
public void putAll(Map<? extends K, ? extends V> t, boolean lock,
boolean removeOnExit, boolean releaseLockOnExit, long timeToLive,
long lockTimeToLive) throws GroupUpdateException,
IllegalStateException {
public void putAll(Map<? extends K, ? extends V> t, boolean lock, boolean removeOnExit, boolean releaseLockOnExit,
long timeToLive, long lockTimeToLive) throws GroupUpdateException, IllegalStateException {
for (java.util.Map.Entry<? extends K, ? extends V> entry : t.entrySet()) {
put(entry.getKey(), entry.getValue(), lock, removeOnExit,
releaseLockOnExit, timeToLive, lockTimeToLive);
put(entry.getKey(), entry.getValue(), lock, removeOnExit, releaseLockOnExit, timeToLive, lockTimeToLive);
}
}
@ -793,14 +747,12 @@ public class Group<K, V> implements Map<K, V>, Service {
* @throws IllegalStateException
*
*/
public V remove(Object key) throws GroupUpdateException,
IllegalStateException {
public V remove(Object key) throws GroupUpdateException, IllegalStateException {
EntryKey<K> entryKey = new EntryKey<K>(this.local, (K) key);
return doRemove(entryKey);
}
V doRemove(EntryKey<K> key) throws GroupUpdateException,
IllegalStateException {
V doRemove(EntryKey<K> key) throws GroupUpdateException, IllegalStateException {
checkStatus();
EntryMessage entryMsg = new EntryMessage();
entryMsg.setKey(key);
@ -863,8 +815,7 @@ public class Group<K, V> implements Map<K, V>, Service {
}
/**
* @return the local member that represents this <CODE>Group</CODE>
* instance
* @return the local member that represents this <CODE>Group</CODE> instance
*/
public Member getLocalMember() {
return this.local;
@ -881,8 +832,7 @@ public class Group<K, V> implements Map<K, V>, Service {
}
boolean result = false;
if (entryValue != null) {
result = entryValue.getKey().getOwner().getId().equals(
this.local.getId());
result = entryValue.getKey().getOwner().getId().equals(this.local.getId());
}
return result;
}
@ -934,8 +884,7 @@ public class Group<K, V> implements Map<K, V>, Service {
*/
public void broadcastMessage(Object message) throws JMSException {
checkStatus();
ObjectMessage objMsg = this.messageSession
.createObjectMessage((Serializable) message);
ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message);
objMsg.setJMSCorrelationID(this.idGenerator.generateId());
objMsg.setJMSType(MESSAGE_TYPE);
objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
@ -952,8 +901,7 @@ public class Group<K, V> implements Map<K, V>, Service {
* @return
* @throws JMSException
*/
public Serializable broadcastMessageRequest(Object message, long timeout)
throws JMSException {
public Serializable broadcastMessageRequest(Object message, long timeout) throws JMSException {
checkStatus();
Object result = null;
MapRequest request = new MapRequest();
@ -961,8 +909,7 @@ public class Group<K, V> implements Map<K, V>, Service {
synchronized (this.messageRequests) {
this.messageRequests.put(id, request);
}
ObjectMessage objMsg = this.stateSession
.createObjectMessage((Serializable) message);
ObjectMessage objMsg = this.stateSession.createObjectMessage((Serializable) message);
objMsg.setJMSReplyTo(this.inboxTopic);
objMsg.setJMSCorrelationID(id);
objMsg.setJMSType(MESSAGE_TYPE);
@ -973,16 +920,14 @@ public class Group<K, V> implements Map<K, V>, Service {
}
/**
* Send a message to the group - but only the least loaded member will
* process it
* Send a message to the group - but only the least loaded member will process it
*
* @param message
* @throws JMSException
*/
public void sendMessage(Object message) throws JMSException {
checkStatus();
ObjectMessage objMsg = this.messageSession
.createObjectMessage((Serializable) message);
ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message);
objMsg.setJMSCorrelationID(this.idGenerator.generateId());
objMsg.setJMSType(MESSAGE_TYPE);
objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
@ -998,8 +943,7 @@ public class Group<K, V> implements Map<K, V>, Service {
*/
public void sendMessage(Member member, Object message) throws JMSException {
checkStatus();
ObjectMessage objMsg = this.messageSession
.createObjectMessage((Serializable) message);
ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message);
objMsg.setJMSCorrelationID(this.idGenerator.generateId());
objMsg.setJMSType(MESSAGE_TYPE);
objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
@ -1016,8 +960,7 @@ public class Group<K, V> implements Map<K, V>, Service {
* @return the request or null
* @throws JMSException
*/
public Object sendMessageRequest(Member member, Object message, long timeout)
throws JMSException {
public Object sendMessageRequest(Member member, Object message, long timeout) throws JMSException {
checkStatus();
Object result = null;
MapRequest request = new MapRequest();
@ -1025,8 +968,7 @@ public class Group<K, V> implements Map<K, V>, Service {
synchronized (this.messageRequests) {
this.messageRequests.put(id, request);
}
ObjectMessage objMsg = this.stateSession
.createObjectMessage((Serializable) message);
ObjectMessage objMsg = this.stateSession.createObjectMessage((Serializable) message);
objMsg.setJMSReplyTo(this.inboxTopic);
objMsg.setJMSCorrelationID(id);
objMsg.setJMSType(MESSAGE_TYPE);
@ -1044,11 +986,9 @@ public class Group<K, V> implements Map<K, V>, Service {
* @param message
* @throws JMSException
*/
public void sendMessageResponse(Member member, String replyId,
Object message) throws JMSException {
public void sendMessageResponse(Member member, String replyId, Object message) throws JMSException {
checkStatus();
ObjectMessage objMsg = this.messageSession
.createObjectMessage((Serializable) message);
ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message);
objMsg.setJMSCorrelationID(replyId);
objMsg.setJMSType(MESSAGE_TYPE);
objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
@ -1056,24 +996,21 @@ public class Group<K, V> implements Map<K, V>, Service {
}
/**
* Select a coordinator - coordinator weighting is used - or if everything
* is equal - a comparison of member ids.
* Select a coordinator - coordinator weighting is used - or if everything is equal - a comparison of member ids.
*
* @param members
* @return
*/
protected Member selectCordinator(List<Member> list) {
List<Member> sorted = sortMemberList(list);
Member result = sorted.isEmpty() ? this.local : sorted
.get(list.size() - 1);
Member result = sorted.isEmpty() ? this.local : sorted.get(list.size() - 1);
return result;
}
protected List<Member> sortMemberList(List<Member> list) {
Collections.sort(list, new Comparator<Member>() {
public int compare(Member m1, Member m2) {
int result = m1.getCoordinatorWeight()
- m2.getCoordinatorWeight();
int result = m1.getCoordinatorWeight() - m2.getCoordinatorWeight();
if (result == 0) {
result = m1.getId().compareTo(m2.getId());
}
@ -1091,8 +1028,7 @@ public class Group<K, V> implements Map<K, V>, Service {
this.stateRequests.put(id, request);
}
try {
ObjectMessage objMsg = this.stateSession
.createObjectMessage(payload);
ObjectMessage objMsg = this.stateSession.createObjectMessage(payload);
objMsg.setJMSReplyTo(this.inboxTopic);
objMsg.setJMSCorrelationID(id);
objMsg.setJMSType(STATE_TYPE);
@ -1113,8 +1049,7 @@ public class Group<K, V> implements Map<K, V>, Service {
return result;
}
void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member,
Serializable payload) {
void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member, Serializable payload) {
MapRequest request = new MapRequest();
String id = this.idGenerator.generateId();
asyncRequest.add(id, request);
@ -1122,8 +1057,7 @@ public class Group<K, V> implements Map<K, V>, Service {
this.stateRequests.put(id, request);
}
try {
ObjectMessage objMsg = this.stateSession
.createObjectMessage(payload);
ObjectMessage objMsg = this.stateSession.createObjectMessage(payload);
objMsg.setJMSReplyTo(this.inboxTopic);
objMsg.setJMSCorrelationID(id);
objMsg.setJMSType(STATE_TYPE);
@ -1142,8 +1076,7 @@ public class Group<K, V> implements Map<K, V>, Service {
processRequest(id, reply);
} else {
try {
ObjectMessage replyMsg = this.stateSession
.createObjectMessage((Serializable) reply);
ObjectMessage replyMsg = this.stateSession.createObjectMessage((Serializable) reply);
replyMsg.setJMSCorrelationID(id);
replyMsg.setJMSType(STATE_TYPE);
this.stateProducer.send(replyTo, replyMsg);
@ -1162,8 +1095,7 @@ public class Group<K, V> implements Map<K, V>, Service {
try {
EntryMessage copy = entry.copy();
copy.setMapUpdate(true);
ObjectMessage objMsg = this.stateSession
.createObjectMessage(copy);
ObjectMessage objMsg = this.stateSession.createObjectMessage(copy);
objMsg.setJMSCorrelationID(correlationId);
objMsg.setJMSType(STATE_TYPE);
this.stateProducer.send(this.stateTopic, objMsg);
@ -1230,22 +1162,18 @@ public class Group<K, V> implements Map<K, V>, Service {
}
}
void processLockUpdate(EntryMessage entryMsg, Destination replyTo,
String correlationId) {
void processLockUpdate(EntryMessage entryMsg, Destination replyTo, String correlationId) {
waitForElection();
synchronized (this.mapMutex) {
boolean newLock = entryMsg.getKey().isLocked();
Member newOwner = entryMsg.getKey().getOwner();
long newLockExpiration = newLock ? entryMsg.getKey()
.getLockExpiration() : 0l;
long newLockExpiration = newLock ? entryMsg.getKey().getLockExpiration() : 0l;
if (isCoordinator() && !entryMsg.isMapUpdate()) {
EntryKey originalKey = getKey(entryMsg.getKey().getKey());
if (originalKey != null) {
if (originalKey.isLocked()) {
if (!originalKey.getOwner().equals(
entryMsg.getKey().getOwner())) {
Serializable reply = new GroupUpdateException(
"Owned by " + originalKey.getOwner());
if (!originalKey.getOwner().equals(entryMsg.getKey().getOwner())) {
Serializable reply = new GroupUpdateException("Owned by " + originalKey.getOwner());
sendReply(reply, replyTo, correlationId);
} else {
originalKey.setLocked(newLock);
@ -1271,13 +1199,11 @@ public class Group<K, V> implements Map<K, V>, Service {
}
}
void processEntryMessage(EntryMessage entryMsg, Destination replyTo,
String correlationId) {
void processEntryMessage(EntryMessage entryMsg, Destination replyTo, String correlationId) {
waitForElection();
if (isCoordinator()) {
EntryKey<K> key = entryMsg.getKey();
EntryValue<V> value = new EntryValue<V>(key, (V) entryMsg
.getValue());
EntryValue<V> value = new EntryValue<V>(key, (V) entryMsg.getValue());
boolean insert = entryMsg.isInsert();
boolean containsKey = false;
synchronized (this.mapMutex) {
@ -1285,8 +1211,7 @@ public class Group<K, V> implements Map<K, V>, Service {
}
if (containsKey) {
EntryKey originalKey = getKey(key.getKey());
if (originalKey.equals(key.getOwner())
|| !originalKey.isLocked()) {
if (originalKey.equals(key.getOwner()) || !originalKey.isLocked()) {
EntryValue<V> old = null;
if (insert) {
synchronized (this.mapMutex) {
@ -1299,11 +1224,9 @@ public class Group<K, V> implements Map<K, V>, Service {
}
entryMsg.setOldValue(old.getValue());
broadcastMapUpdate(entryMsg, correlationId);
fireMapChanged(key.getOwner(), key.getKey(),
old.getValue(), value.getValue(), false);
fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), value.getValue(), false);
} else {
Serializable reply = new GroupUpdateException(
"Owned by " + originalKey.getOwner());
Serializable reply = new GroupUpdateException("Owned by " + originalKey.getOwner());
sendReply(reply, replyTo, correlationId);
}
} else {
@ -1312,8 +1235,7 @@ public class Group<K, V> implements Map<K, V>, Service {
this.localMap.put(key.getKey(), value);
}
broadcastMapUpdate(entryMsg, correlationId);
fireMapChanged(key.getOwner(), key.getKey(), null, value
.getValue(), false);
fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue(), false);
} else {
sendReply(null, replyTo, correlationId);
}
@ -1349,24 +1271,20 @@ public class Group<K, V> implements Map<K, V>, Service {
value.setValue(null);
}
}
fireMapChanged(key.getOwner(), key.getKey(),
old.getValue(), value.getValue(), entryMsg
.isExpired());
fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), value.getValue(), entryMsg.isExpired());
}
} else {
if (insert) {
synchronized (this.mapMutex) {
this.localMap.put(key.getKey(), value);
}
fireMapChanged(key.getOwner(), key.getKey(), null, value
.getValue(), false);
fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue(), false);
}
}
}
}
void processGroupMessage(String memberId, String replyId,
Destination replyTo, Object payload) {
void processGroupMessage(String memberId, String replyId, Destination replyTo, Object payload) {
Member member = this.members.get(memberId);
if (member != null) {
fireMemberMessage(member, replyId, payload);
@ -1412,8 +1330,7 @@ public class Group<K, V> implements Map<K, V>, Service {
void handleConsumerEvents(ConsumerEvent event) {
if (!event.isStarted()) {
Member member = this.members.remove(event.getConsumerId()
.toString());
Member member = this.members.remove(event.getConsumerId().toString());
if (member != null) {
fireMemberStopped(member);
election(member, false);
@ -1423,8 +1340,7 @@ public class Group<K, V> implements Map<K, V>, Service {
void checkMembership() {
if (this.started.get() && this.electionFinished.get()) {
long checkTime = System.currentTimeMillis()
- getHeartBeatInterval();
long checkTime = System.currentTimeMillis() - getHeartBeatInterval();
boolean doElection = false;
for (Member member : this.members.values()) {
if (member.getTimeStamp() < checkTime) {
@ -1442,8 +1358,7 @@ public class Group<K, V> implements Map<K, V>, Service {
void expirationSweep() {
waitForElection();
if (isCoordinator() && this.started.get()
&& this.electionFinished.get()) {
if (isCoordinator() && this.started.get() && this.electionFinished.get()) {
List<EntryKey> expiredMessages = null;
List<EntryKey> expiredLocks = null;
synchronized (this.mapMutex) {
@ -1455,13 +1370,12 @@ public class Group<K, V> implements Map<K, V>, Service {
if (k.isExpired(currentTime)) {
if (expiredMessages == null) {
expiredMessages = new ArrayList<EntryKey>();
expiredMessages.add(k);
}
expiredMessages.add(k);
} else if (k.isLockExpired(currentTime)) {
k.setLocked(false);
if (expiredLocks == null) {
expiredLocks = new ArrayList<EntryKey>();
expiredLocks.add(k);
}
expiredLocks.add(k);
}
@ -1489,8 +1403,7 @@ public class Group<K, V> implements Map<K, V>, Service {
}
void doMessageExpiration(List<EntryKey> list) {
if (this.started.get() && this.electionFinished.get()
&& isCoordinator()) {
if (this.started.get() && this.electionFinished.get() && isCoordinator()) {
for (EntryKey k : list) {
EntryValue<V> old = null;
synchronized (this.mapMutex) {
@ -1503,16 +1416,14 @@ public class Group<K, V> implements Map<K, V>, Service {
entryMsg.setKey(k);
entryMsg.setValue(old.getValue());
broadcastMapUpdate(entryMsg, "");
fireMapChanged(k.getOwner(), k.getKey(), old.getValue(),
null, true);
fireMapChanged(k.getOwner(), k.getKey(), old.getValue(), null, true);
}
}
}
}
void doLockExpiration(List<EntryKey> list) {
if (this.started.get() && this.electionFinished.get()
&& isCoordinator()) {
if (this.started.get() && this.electionFinished.get() && isCoordinator()) {
for (EntryKey k : list) {
EntryMessage entryMsg = new EntryMessage();
entryMsg.setType(EntryMessage.MessageType.DELETE);
@ -1530,8 +1441,7 @@ public class Group<K, V> implements Map<K, V>, Service {
void sendHeartBeat(Destination destination) {
if (this.started.get()) {
try {
ObjectMessage msg = this.stateSession
.createObjectMessage(this.local);
ObjectMessage msg = this.stateSession.createObjectMessage(this.local);
msg.setJMSType(STATE_TYPE);
this.stateProducer.send(destination, msg);
} catch (javax.jms.IllegalStateException e) {
@ -1548,8 +1458,7 @@ public class Group<K, V> implements Map<K, V>, Service {
List<Map.Entry<K, EntryValue<V>>> list = new ArrayList<Map.Entry<K, EntryValue<V>>>();
synchronized (this.mapMutex) {
if (this.localMap != null) {
for (Map.Entry<K, EntryValue<V>> entry : this.localMap
.entrySet()) {
for (Map.Entry<K, EntryValue<V>> entry : this.localMap.entrySet()) {
list.add(entry);
}
}
@ -1561,12 +1470,10 @@ public class Group<K, V> implements Map<K, V>, Service {
entryMsg.setValue(entry.getValue().getValue());
entryMsg.setType(EntryMessage.MessageType.SYNC);
entryMsg.setMapUpdate(true);
ObjectMessage objMsg = this.stateSession
.createObjectMessage(entryMsg);
ObjectMessage objMsg = this.stateSession.createObjectMessage(entryMsg);
if (!member.equals(entry.getValue().getKey().getOwner())) {
objMsg.setJMSType(STATE_TYPE);
this.stateProducer.send(member.getInBoxDestination(),
objMsg);
this.stateProducer.send(member.getInBoxDestination(), objMsg);
}
}
} catch (javax.jms.IllegalStateException e) {
@ -1615,16 +1522,13 @@ public class Group<K, V> implements Map<K, V>, Service {
synchronized (this.mapMutex) {
value = this.localMap.remove(entryKey);
}
fireMapChanged(member, entryKey.getKey(), value.getValue(),
null, false);
fireMapChanged(member, entryKey.getKey(), value.getValue(), null, false);
}
}
}
void fireMemberMessage(final Member member, final String replyId,
final Object message) {
if (this.started.get() && this.stateExecutor != null
&& !this.messageExecutor.isShutdown()) {
void fireMemberMessage(final Member member, final String replyId, final Object message) {
if (this.started.get() && this.stateExecutor != null && !this.messageExecutor.isShutdown()) {
this.messageExecutor.execute(new Runnable() {
public void run() {
doFireMemberMessage(member, replyId, message);
@ -1641,10 +1545,9 @@ public class Group<K, V> implements Map<K, V>, Service {
}
}
void fireMapChanged(final Member owner, final Object key,
final Object oldValue, final Object newValue, final boolean expired) {
if (this.started.get() && this.stateExecutor != null
&& !this.stateExecutor.isShutdown()) {
void fireMapChanged(final Member owner, final Object key, final Object oldValue, final Object newValue,
final boolean expired) {
if (this.started.get() && this.stateExecutor != null && !this.stateExecutor.isShutdown()) {
this.stateExecutor.execute(new Runnable() {
public void run() {
doFireMapChanged(owner, key, oldValue, newValue, expired);
@ -1653,8 +1556,7 @@ public class Group<K, V> implements Map<K, V>, Service {
}
}
void doFireMapChanged(Member owner, Object key, Object oldValue,
Object newValue, boolean expired) {
void doFireMapChanged(Member owner, Object key, Object oldValue, Object newValue, boolean expired) {
if (this.started.get()) {
for (GroupStateChangedListener l : this.mapChangedListeners) {
if (oldValue == null) {
@ -1670,28 +1572,24 @@ public class Group<K, V> implements Map<K, V>, Service {
void checkStatus() throws IllegalStateException {
if (!started.get()) {
throw new IllegalStateException("GroupMap " + this.local.getName()
+ " not started");
throw new IllegalStateException("GroupMap " + this.local.getName() + " not started");
}
waitForElection();
}
public String toString() {
return "Group:" + getName() + "{id=" + this.local.getId()
+ ",coordinator=" + isCoordinator() + ",inbox="
return "Group:" + getName() + "{id=" + this.local.getId() + ",coordinator=" + isCoordinator() + ",inbox="
+ this.local.getInBoxDestination() + "}";
}
void election(final Member member, final boolean memberStarted) {
if (this.started.get() && this.stateExecutor != null
&& !this.electionExecutor.isShutdown()) {
if (this.started.get() && this.electionExecutor != null && !this.electionExecutor.isShutdown()) {
synchronized (this.electionFinished) {
this.electionFinished.set(false);
}
synchronized (this.electionExecutor) {
// remove any queued election tasks
List<Runnable> list = new ArrayList<Runnable>(
this.electionExecutor.getQueue());
List<Runnable> list = new ArrayList<Runnable>(this.electionExecutor.getQueue());
for (Runnable r : list) {
ElectionService es = (ElectionService) r;
es.stop();
@ -1723,8 +1621,7 @@ public class Group<K, V> implements Map<K, V>, Service {
return result;
}
void processElectionMessage(ElectionMessage msg, Destination replyTo,
String correlationId) {
void processElectionMessage(ElectionMessage msg, Destination replyTo, String correlationId) {
if (msg.isElection()) {
msg.setType(ElectionMessage.MessageType.ANSWER);
msg.setMember(this.local);
@ -1745,16 +1642,14 @@ public class Group<K, V> implements Map<K, V>, Service {
ElectionMessage msg = new ElectionMessage();
msg.setMember(this.local);
msg.setType(type);
ObjectMessage objMsg = this.stateSession
.createObjectMessage(msg);
ObjectMessage objMsg = this.stateSession.createObjectMessage(msg);
objMsg.setJMSType(STATE_TYPE);
this.stateProducer.send(this.stateTopic, objMsg);
} catch (javax.jms.IllegalStateException e) {
// ignore - we are stopping
} catch (JMSException e) {
if (this.started.get()) {
LOG.error("Failed to broadcast election message: " + type,
e);
LOG.error("Failed to broadcast election message: " + type, e);
}
}
}
@ -1795,30 +1690,26 @@ public class Group<K, V> implements Map<K, V>, Service {
}
void doElection() {
if ((this.member == null || (!this.member.equals(Group.this.local) || Group.this.members
.size() == getMinimumGroupSize()))) {
if ((this.member == null || (!this.member.equals(Group.this.local) || Group.this.members.size() == getMinimumGroupSize()))) {
boolean wasCoordinator = isCoordinatorMatch() && !isEmpty();
// call an election
while (!callElection() && isStarted() && this.started.get())
;
if (isStarted() && this.started.get()) {
List<Member> members = new ArrayList<Member>(
Group.this.members.values());
List<Member> members = new ArrayList<Member>(Group.this.members.values());
Group.this.coordinator = selectCordinator(members);
if (isCoordinatorMatch()) {
broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
}
if (this.memberStarted && this.member != null) {
if (wasCoordinator || isCoordinator()
&& this.started.get()) {
if (wasCoordinator || isCoordinator() && this.started.get()) {
updateNewMemberMap(this.member);
}
}
if (!isElectionFinished() && this.started.get()) {
try {
synchronized (Group.this.electionFinished) {
Group.this.electionFinished
.wait(Group.this.heartBeatInterval * 2);
Group.this.electionFinished.wait(Group.this.heartBeatInterval * 2);
}
} catch (InterruptedException e) {
}