From 72a047f338ac469f848d3080ddf5d1f05ba2f8df Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Thu, 6 Aug 2009 06:15:52 +0000 Subject: [PATCH] tidied up a little git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@801515 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/activegroups/Group.java | 375 +++++++----------- 1 file changed, 133 insertions(+), 242 deletions(-) diff --git a/activemq-groups/src/main/java/org/apache/activegroups/Group.java b/activemq-groups/src/main/java/org/apache/activegroups/Group.java index 0e87c0feba..be0f41228d 100644 --- a/activemq-groups/src/main/java/org/apache/activegroups/Group.java +++ b/activemq-groups/src/main/java/org/apache/activegroups/Group.java @@ -66,38 +66,31 @@ import org.apache.commons.logging.LogFactory; /** *

- * A Group is a distributed collaboration implementation that is - * used to shared state and process messages amongst a distributed group of - * other Group instances. Membership of a group is handled + * A Group is a distributed collaboration implementation that is used to shared state and process + * messages amongst a distributed group of other Group instances. Membership of a group is handled * automatically using discovery. *

- * The underlying transport is JMS and there are some optimizations that occur - * for membership if used with ActiveMQ - but Group can be used - * with any JMS implementation. + * The underlying transport is JMS and there are some optimizations that occur for membership if used with ActiveMQ - + * but Group can be used with any JMS implementation. * *

- * Updates to the group shared map are controlled by a coordinator. The - * coordinator is elected by the member with the lowest lexicographical id - - * based on the bully algorithm [Silberschatz et al. 1993] + * Updates to the group shared map are controlled by a coordinator. The coordinator is elected by the member with the + * lowest lexicographical id - based on the bully algorithm [Silberschatz et al. 1993] *

- * The {@link #selectCordinator(Collection members)} method may be - * overridden to implement a custom mechanism for choosing how the coordinator - * is elected for the map. + * The {@link #selectCordinator(Collection members)} method may be overridden to implement a custom mechanism + * for choosing how the coordinator is elected for the map. *

- * New Group instances have their state updated by the - * coordinator, and coordinator failure is handled automatically within the - * group. + * New Group instances have their state updated by the coordinator, and coordinator failure is handled + * automatically within the group. *

- * All map updates are totally ordered through the coordinator, whilst read - * operations happen locally. + * All map updates are totally ordered through the coordinator, whilst read operations happen locally. *

- * A Group supports the concept of owner only updates(write - * locks), shared updates, entry expiration times and removal on owner exit - - * all of which are optional. In addition, you can grab and release locks for - * values in the map, independently of who created them. + * A Group supports the concept of owner only updates(write locks), shared updates, entry expiration + * times and removal on owner exit - all of which are optional. In addition, you can grab and release locks for values + * in the map, independently of who created them. *

- * In addition, members of a group can broadcast messages and implement - * request/response with other Group instances. + * In addition, members of a group can broadcast messages and implement request/response with other Group + * instances. * *

* @@ -114,10 +107,8 @@ public class Group implements Map, Service { public static final long DEFAULT_HEART_BEAT_INTERVAL = 1000; private static final long EXPIRATION_SWEEP_INTERVAL = 500; private static final Log LOG = LogFactory.getLog(Group.class); - private static final String STATE_PREFIX = "STATE." + Group.class.getName() - + "."; - private static final String GROUP_MESSAGE_PREFIX = "MESSAGE." - + Group.class.getName() + "."; + private static final String STATE_PREFIX = "STATE." + Group.class.getName() + "."; + private static final String GROUP_MESSAGE_PREFIX = "MESSAGE." + Group.class.getName() + "."; private static final String STATE_TYPE = "state"; private static final String MESSAGE_TYPE = "message"; private static final String MEMBER_ID_PROPERTY = "memberId"; @@ -184,8 +175,7 @@ public class Group implements Map, Service { } /** - * Set the local map implementation to be used By default its a HashMap - - * but you could use a Cache for example + * Set the local map implementation to be used By default its a HashMap - but you could use a Cache for example * * @param map */ @@ -209,31 +199,22 @@ public class Group implements Map, Service { } } this.connection.start(); - this.stateSession = this.connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); - this.messageSession = this.connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); + this.stateSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + this.messageSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.stateProducer = this.stateSession.createProducer(null); this.stateProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); this.inboxTopic = this.stateSession.createTemporaryTopic(); String stateTopicName = STATE_PREFIX + this.groupName; this.stateTopic = this.stateSession.createTopic(stateTopicName); - this.heartBeatTopic = this.stateSession.createTopic(stateTopicName - + ".heartbeat"); - String messageDestinationName = GROUP_MESSAGE_PREFIX - + this.groupName; - this.messageTopic = this.messageSession - .createTopic(messageDestinationName); - this.messageQueue = this.messageSession - .createQueue(messageDestinationName); - MessageConsumer privateInbox = this.messageSession - .createConsumer(this.inboxTopic); - MessageConsumer memberChangeConsumer = this.stateSession - .createConsumer(this.stateTopic); + this.heartBeatTopic = this.stateSession.createTopic(stateTopicName + ".heartbeat"); + String messageDestinationName = GROUP_MESSAGE_PREFIX + this.groupName; + this.messageTopic = this.messageSession.createTopic(messageDestinationName); + this.messageQueue = this.messageSession.createQueue(messageDestinationName); + MessageConsumer privateInbox = this.messageSession.createConsumer(this.inboxTopic); + MessageConsumer memberChangeConsumer = this.stateSession.createConsumer(this.stateTopic); String memberId = null; if (memberChangeConsumer instanceof ActiveMQMessageConsumer) { - memberId = ((ActiveMQMessageConsumer) memberChangeConsumer) - .getConsumerId().toString(); + memberId = ((ActiveMQMessageConsumer) memberChangeConsumer).getConsumerId().toString(); } else { memberId = this.idGenerator.generateId(); } @@ -252,63 +233,53 @@ public class Group implements Map, Service { }); this.messageProducer = this.messageSession.createProducer(null); this.messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageConsumer topicMessageConsumer = this.messageSession - .createConsumer(this.messageTopic); + MessageConsumer topicMessageConsumer = this.messageSession.createConsumer(this.messageTopic); topicMessageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { processJMSMessage(message); } }); - MessageConsumer queueMessageConsumer = this.messageSession - .createConsumer(this.messageQueue); + MessageConsumer queueMessageConsumer = this.messageSession.createConsumer(this.messageQueue); queueMessageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { processJMSMessage(message); } }); - MessageConsumer heartBeatConsumer = this.stateSession - .createConsumer(this.heartBeatTopic); + MessageConsumer heartBeatConsumer = this.stateSession.createConsumer(this.heartBeatTopic); heartBeatConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { handleHeartbeats(message); } }); - this.consumerEvents = new ConsumerEventSource(this.connection, - this.stateTopic); + this.consumerEvents = new ConsumerEventSource(this.connection, this.stateTopic); this.consumerEvents.setConsumerListener(new ConsumerListener() { public void onConsumerEvent(ConsumerEvent event) { handleConsumerEvents(event); } }); this.consumerEvents.start(); - this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), - new ThreadFactory() { + this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Election{" - + Group.this.local + "}"); - thread.setDaemon(true); - return thread; - } - }); - this.stateExecutor = Executors - .newSingleThreadExecutor(new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Group State{" - + Group.this.local + "}"); - thread.setDaemon(true); - return thread; - } - }); - this.messageExecutor = Executors - .newSingleThreadExecutor(new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, - "Group Messages{" + Group.this.local + "}"); + Thread thread = new Thread(runnable, "Election{" + Group.this.local + "}"); thread.setDaemon(true); return thread; } }); + this.stateExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "Group State{" + Group.this.local + "}"); + thread.setDaemon(true); + return thread; + } + }); + this.messageExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "Group Messages{" + Group.this.local + "}"); + thread.setDaemon(true); + return thread; + } + }); sendHeartBeat(); this.heartBeatTask = new SchedulerTimerTask(new Runnable() { public void run() { @@ -326,19 +297,13 @@ public class Group implements Map, Service { } }); this.timer = new Timer("Distributed heart beat", true); - this.timer.scheduleAtFixedRate(this.heartBeatTask, - getHeartBeatInterval() / 3, getHeartBeatInterval() / 2); - this.timer.scheduleAtFixedRate(this.checkMembershipTask, - getHeartBeatInterval(), getHeartBeatInterval()); - this.timer.scheduleAtFixedRate(this.expirationTask, - EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL); + this.timer.scheduleAtFixedRate(this.heartBeatTask, getHeartBeatInterval() / 3, getHeartBeatInterval() / 2); + this.timer.scheduleAtFixedRate(this.checkMembershipTask, getHeartBeatInterval(), getHeartBeatInterval()); + this.timer.scheduleAtFixedRate(this.expirationTask, EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL); // await for members to join - long timeout = (long) (this.heartBeatInterval - * this.minimumGroupSize *1.5); + long timeout = (long) (this.heartBeatInterval * this.minimumGroupSize * 1.5); long deadline = System.currentTimeMillis() + timeout; - while ((this.members.size() < this.minimumGroupSize || !this.electionFinished - .get()) - && timeout > 0) { + while ((this.members.size() < this.minimumGroupSize || !this.electionFinished.get()) && timeout > 0) { synchronized (this.electionFinished) { this.electionFinished.wait(timeout); } @@ -376,7 +341,6 @@ public class Group implements Map, Service { } catch (Exception e) { LOG.debug("Caught exception stopping", e); } - } } @@ -421,8 +385,7 @@ public class Group implements Map, Service { /** * @param alwaysLock - - * set true if objects inserted will always be locked (default is - * false) + * set true if objects inserted will always be locked (default is false) */ public void setAlwaysLock(boolean alwaysLock) { this.alwaysLock = alwaysLock; @@ -520,8 +483,7 @@ public class Group implements Map, Service { } /** - * Sets the policy for owned objects in the group If set to true, when this - * GroupMap stops, + * Sets the policy for owned objects in the group If set to true, when this GroupMap stops, * any objects it owns will be removed from the group map * @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set */ @@ -612,16 +574,14 @@ public class Group implements Map, Service { public boolean containsKey(Object key) { synchronized (this.mapMutex) { - return this.localMap != null ? this.localMap.containsKey(key) - : false; + return this.localMap != null ? this.localMap.containsKey(key) : false; } } public boolean containsValue(Object value) { EntryValue entryValue = new EntryValue(null, value); synchronized (this.mapMutex) { - return this.localMap != null ? this.localMap - .containsValue(entryValue) : false; + return this.localMap != null ? this.localMap.containsValue(entryValue) : false; } } @@ -669,10 +629,9 @@ public class Group implements Map, Service { * @throws IllegalStateException * */ - public V put(K key, V value) throws GroupUpdateException, - IllegalStateException { - return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(), - isReleaseLockOnExit(), getTimeToLive(), getLockTimeToLive()); + public V put(K key, V value) throws GroupUpdateException, IllegalStateException { + return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(), isReleaseLockOnExit(), getTimeToLive(), + getLockTimeToLive()); } /** @@ -690,9 +649,8 @@ public class Group implements Map, Service { * @throws IllegalStateException * */ - public V put(K key, V value, boolean lock, boolean removeOnExit, - boolean releaseLockOnExit, long timeToLive, long leaseTime) - throws GroupUpdateException, IllegalStateException { + public V put(K key, V value, boolean lock, boolean removeOnExit, boolean releaseLockOnExit, long timeToLive, + long leaseTime) throws GroupUpdateException, IllegalStateException { checkStatus(); EntryKey entryKey = new EntryKey(this.local, key); entryKey.setLocked(lock); @@ -756,10 +714,9 @@ public class Group implements Map, Service { * @throws GroupUpdateException * @throws IllegalStateException */ - public void putAll(Map t) - throws GroupUpdateException, IllegalStateException { - putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(), - isReleaseLockOnExit(), getTimeToLive(), getLockTimeToLive()); + public void putAll(Map t) throws GroupUpdateException, IllegalStateException { + putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(), isReleaseLockOnExit(), getTimeToLive(), + getLockTimeToLive()); } /** @@ -774,13 +731,10 @@ public class Group implements Map, Service { * @throws GroupUpdateException * @throws IllegalStateException */ - public void putAll(Map t, boolean lock, - boolean removeOnExit, boolean releaseLockOnExit, long timeToLive, - long lockTimeToLive) throws GroupUpdateException, - IllegalStateException { + public void putAll(Map t, boolean lock, boolean removeOnExit, boolean releaseLockOnExit, + long timeToLive, long lockTimeToLive) throws GroupUpdateException, IllegalStateException { for (java.util.Map.Entry entry : t.entrySet()) { - put(entry.getKey(), entry.getValue(), lock, removeOnExit, - releaseLockOnExit, timeToLive, lockTimeToLive); + put(entry.getKey(), entry.getValue(), lock, removeOnExit, releaseLockOnExit, timeToLive, lockTimeToLive); } } @@ -793,14 +747,12 @@ public class Group implements Map, Service { * @throws IllegalStateException * */ - public V remove(Object key) throws GroupUpdateException, - IllegalStateException { + public V remove(Object key) throws GroupUpdateException, IllegalStateException { EntryKey entryKey = new EntryKey(this.local, (K) key); return doRemove(entryKey); } - V doRemove(EntryKey key) throws GroupUpdateException, - IllegalStateException { + V doRemove(EntryKey key) throws GroupUpdateException, IllegalStateException { checkStatus(); EntryMessage entryMsg = new EntryMessage(); entryMsg.setKey(key); @@ -863,8 +815,7 @@ public class Group implements Map, Service { } /** - * @return the local member that represents this Group - * instance + * @return the local member that represents this Group instance */ public Member getLocalMember() { return this.local; @@ -881,8 +832,7 @@ public class Group implements Map, Service { } boolean result = false; if (entryValue != null) { - result = entryValue.getKey().getOwner().getId().equals( - this.local.getId()); + result = entryValue.getKey().getOwner().getId().equals(this.local.getId()); } return result; } @@ -934,8 +884,7 @@ public class Group implements Map, Service { */ public void broadcastMessage(Object message) throws JMSException { checkStatus(); - ObjectMessage objMsg = this.messageSession - .createObjectMessage((Serializable) message); + ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message); objMsg.setJMSCorrelationID(this.idGenerator.generateId()); objMsg.setJMSType(MESSAGE_TYPE); objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); @@ -952,8 +901,7 @@ public class Group implements Map, Service { * @return * @throws JMSException */ - public Serializable broadcastMessageRequest(Object message, long timeout) - throws JMSException { + public Serializable broadcastMessageRequest(Object message, long timeout) throws JMSException { checkStatus(); Object result = null; MapRequest request = new MapRequest(); @@ -961,8 +909,7 @@ public class Group implements Map, Service { synchronized (this.messageRequests) { this.messageRequests.put(id, request); } - ObjectMessage objMsg = this.stateSession - .createObjectMessage((Serializable) message); + ObjectMessage objMsg = this.stateSession.createObjectMessage((Serializable) message); objMsg.setJMSReplyTo(this.inboxTopic); objMsg.setJMSCorrelationID(id); objMsg.setJMSType(MESSAGE_TYPE); @@ -973,16 +920,14 @@ public class Group implements Map, Service { } /** - * Send a message to the group - but only the least loaded member will - * process it + * Send a message to the group - but only the least loaded member will process it * * @param message * @throws JMSException */ public void sendMessage(Object message) throws JMSException { checkStatus(); - ObjectMessage objMsg = this.messageSession - .createObjectMessage((Serializable) message); + ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message); objMsg.setJMSCorrelationID(this.idGenerator.generateId()); objMsg.setJMSType(MESSAGE_TYPE); objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); @@ -998,8 +943,7 @@ public class Group implements Map, Service { */ public void sendMessage(Member member, Object message) throws JMSException { checkStatus(); - ObjectMessage objMsg = this.messageSession - .createObjectMessage((Serializable) message); + ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message); objMsg.setJMSCorrelationID(this.idGenerator.generateId()); objMsg.setJMSType(MESSAGE_TYPE); objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); @@ -1016,8 +960,7 @@ public class Group implements Map, Service { * @return the request or null * @throws JMSException */ - public Object sendMessageRequest(Member member, Object message, long timeout) - throws JMSException { + public Object sendMessageRequest(Member member, Object message, long timeout) throws JMSException { checkStatus(); Object result = null; MapRequest request = new MapRequest(); @@ -1025,8 +968,7 @@ public class Group implements Map, Service { synchronized (this.messageRequests) { this.messageRequests.put(id, request); } - ObjectMessage objMsg = this.stateSession - .createObjectMessage((Serializable) message); + ObjectMessage objMsg = this.stateSession.createObjectMessage((Serializable) message); objMsg.setJMSReplyTo(this.inboxTopic); objMsg.setJMSCorrelationID(id); objMsg.setJMSType(MESSAGE_TYPE); @@ -1044,11 +986,9 @@ public class Group implements Map, Service { * @param message * @throws JMSException */ - public void sendMessageResponse(Member member, String replyId, - Object message) throws JMSException { + public void sendMessageResponse(Member member, String replyId, Object message) throws JMSException { checkStatus(); - ObjectMessage objMsg = this.messageSession - .createObjectMessage((Serializable) message); + ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message); objMsg.setJMSCorrelationID(replyId); objMsg.setJMSType(MESSAGE_TYPE); objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); @@ -1056,24 +996,21 @@ public class Group implements Map, Service { } /** - * Select a coordinator - coordinator weighting is used - or if everything - * is equal - a comparison of member ids. + * Select a coordinator - coordinator weighting is used - or if everything is equal - a comparison of member ids. * * @param members * @return */ protected Member selectCordinator(List list) { List sorted = sortMemberList(list); - Member result = sorted.isEmpty() ? this.local : sorted - .get(list.size() - 1); + Member result = sorted.isEmpty() ? this.local : sorted.get(list.size() - 1); return result; } protected List sortMemberList(List list) { Collections.sort(list, new Comparator() { public int compare(Member m1, Member m2) { - int result = m1.getCoordinatorWeight() - - m2.getCoordinatorWeight(); + int result = m1.getCoordinatorWeight() - m2.getCoordinatorWeight(); if (result == 0) { result = m1.getId().compareTo(m2.getId()); } @@ -1091,8 +1028,7 @@ public class Group implements Map, Service { this.stateRequests.put(id, request); } try { - ObjectMessage objMsg = this.stateSession - .createObjectMessage(payload); + ObjectMessage objMsg = this.stateSession.createObjectMessage(payload); objMsg.setJMSReplyTo(this.inboxTopic); objMsg.setJMSCorrelationID(id); objMsg.setJMSType(STATE_TYPE); @@ -1113,8 +1049,7 @@ public class Group implements Map, Service { return result; } - void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member, - Serializable payload) { + void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member, Serializable payload) { MapRequest request = new MapRequest(); String id = this.idGenerator.generateId(); asyncRequest.add(id, request); @@ -1122,8 +1057,7 @@ public class Group implements Map, Service { this.stateRequests.put(id, request); } try { - ObjectMessage objMsg = this.stateSession - .createObjectMessage(payload); + ObjectMessage objMsg = this.stateSession.createObjectMessage(payload); objMsg.setJMSReplyTo(this.inboxTopic); objMsg.setJMSCorrelationID(id); objMsg.setJMSType(STATE_TYPE); @@ -1142,8 +1076,7 @@ public class Group implements Map, Service { processRequest(id, reply); } else { try { - ObjectMessage replyMsg = this.stateSession - .createObjectMessage((Serializable) reply); + ObjectMessage replyMsg = this.stateSession.createObjectMessage((Serializable) reply); replyMsg.setJMSCorrelationID(id); replyMsg.setJMSType(STATE_TYPE); this.stateProducer.send(replyTo, replyMsg); @@ -1162,8 +1095,7 @@ public class Group implements Map, Service { try { EntryMessage copy = entry.copy(); copy.setMapUpdate(true); - ObjectMessage objMsg = this.stateSession - .createObjectMessage(copy); + ObjectMessage objMsg = this.stateSession.createObjectMessage(copy); objMsg.setJMSCorrelationID(correlationId); objMsg.setJMSType(STATE_TYPE); this.stateProducer.send(this.stateTopic, objMsg); @@ -1230,22 +1162,18 @@ public class Group implements Map, Service { } } - void processLockUpdate(EntryMessage entryMsg, Destination replyTo, - String correlationId) { + void processLockUpdate(EntryMessage entryMsg, Destination replyTo, String correlationId) { waitForElection(); synchronized (this.mapMutex) { boolean newLock = entryMsg.getKey().isLocked(); Member newOwner = entryMsg.getKey().getOwner(); - long newLockExpiration = newLock ? entryMsg.getKey() - .getLockExpiration() : 0l; + long newLockExpiration = newLock ? entryMsg.getKey().getLockExpiration() : 0l; if (isCoordinator() && !entryMsg.isMapUpdate()) { EntryKey originalKey = getKey(entryMsg.getKey().getKey()); if (originalKey != null) { if (originalKey.isLocked()) { - if (!originalKey.getOwner().equals( - entryMsg.getKey().getOwner())) { - Serializable reply = new GroupUpdateException( - "Owned by " + originalKey.getOwner()); + if (!originalKey.getOwner().equals(entryMsg.getKey().getOwner())) { + Serializable reply = new GroupUpdateException("Owned by " + originalKey.getOwner()); sendReply(reply, replyTo, correlationId); } else { originalKey.setLocked(newLock); @@ -1271,13 +1199,11 @@ public class Group implements Map, Service { } } - void processEntryMessage(EntryMessage entryMsg, Destination replyTo, - String correlationId) { + void processEntryMessage(EntryMessage entryMsg, Destination replyTo, String correlationId) { waitForElection(); if (isCoordinator()) { EntryKey key = entryMsg.getKey(); - EntryValue value = new EntryValue(key, (V) entryMsg - .getValue()); + EntryValue value = new EntryValue(key, (V) entryMsg.getValue()); boolean insert = entryMsg.isInsert(); boolean containsKey = false; synchronized (this.mapMutex) { @@ -1285,8 +1211,7 @@ public class Group implements Map, Service { } if (containsKey) { EntryKey originalKey = getKey(key.getKey()); - if (originalKey.equals(key.getOwner()) - || !originalKey.isLocked()) { + if (originalKey.equals(key.getOwner()) || !originalKey.isLocked()) { EntryValue old = null; if (insert) { synchronized (this.mapMutex) { @@ -1299,11 +1224,9 @@ public class Group implements Map, Service { } entryMsg.setOldValue(old.getValue()); broadcastMapUpdate(entryMsg, correlationId); - fireMapChanged(key.getOwner(), key.getKey(), - old.getValue(), value.getValue(), false); + fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), value.getValue(), false); } else { - Serializable reply = new GroupUpdateException( - "Owned by " + originalKey.getOwner()); + Serializable reply = new GroupUpdateException("Owned by " + originalKey.getOwner()); sendReply(reply, replyTo, correlationId); } } else { @@ -1312,8 +1235,7 @@ public class Group implements Map, Service { this.localMap.put(key.getKey(), value); } broadcastMapUpdate(entryMsg, correlationId); - fireMapChanged(key.getOwner(), key.getKey(), null, value - .getValue(), false); + fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue(), false); } else { sendReply(null, replyTo, correlationId); } @@ -1349,24 +1271,20 @@ public class Group implements Map, Service { value.setValue(null); } } - fireMapChanged(key.getOwner(), key.getKey(), - old.getValue(), value.getValue(), entryMsg - .isExpired()); + fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), value.getValue(), entryMsg.isExpired()); } } else { if (insert) { synchronized (this.mapMutex) { this.localMap.put(key.getKey(), value); } - fireMapChanged(key.getOwner(), key.getKey(), null, value - .getValue(), false); + fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue(), false); } } } } - void processGroupMessage(String memberId, String replyId, - Destination replyTo, Object payload) { + void processGroupMessage(String memberId, String replyId, Destination replyTo, Object payload) { Member member = this.members.get(memberId); if (member != null) { fireMemberMessage(member, replyId, payload); @@ -1412,8 +1330,7 @@ public class Group implements Map, Service { void handleConsumerEvents(ConsumerEvent event) { if (!event.isStarted()) { - Member member = this.members.remove(event.getConsumerId() - .toString()); + Member member = this.members.remove(event.getConsumerId().toString()); if (member != null) { fireMemberStopped(member); election(member, false); @@ -1423,8 +1340,7 @@ public class Group implements Map, Service { void checkMembership() { if (this.started.get() && this.electionFinished.get()) { - long checkTime = System.currentTimeMillis() - - getHeartBeatInterval(); + long checkTime = System.currentTimeMillis() - getHeartBeatInterval(); boolean doElection = false; for (Member member : this.members.values()) { if (member.getTimeStamp() < checkTime) { @@ -1442,8 +1358,7 @@ public class Group implements Map, Service { void expirationSweep() { waitForElection(); - if (isCoordinator() && this.started.get() - && this.electionFinished.get()) { + if (isCoordinator() && this.started.get() && this.electionFinished.get()) { List expiredMessages = null; List expiredLocks = null; synchronized (this.mapMutex) { @@ -1455,13 +1370,12 @@ public class Group implements Map, Service { if (k.isExpired(currentTime)) { if (expiredMessages == null) { expiredMessages = new ArrayList(); - expiredMessages.add(k); } + expiredMessages.add(k); } else if (k.isLockExpired(currentTime)) { k.setLocked(false); if (expiredLocks == null) { expiredLocks = new ArrayList(); - expiredLocks.add(k); } expiredLocks.add(k); } @@ -1489,8 +1403,7 @@ public class Group implements Map, Service { } void doMessageExpiration(List list) { - if (this.started.get() && this.electionFinished.get() - && isCoordinator()) { + if (this.started.get() && this.electionFinished.get() && isCoordinator()) { for (EntryKey k : list) { EntryValue old = null; synchronized (this.mapMutex) { @@ -1503,16 +1416,14 @@ public class Group implements Map, Service { entryMsg.setKey(k); entryMsg.setValue(old.getValue()); broadcastMapUpdate(entryMsg, ""); - fireMapChanged(k.getOwner(), k.getKey(), old.getValue(), - null, true); + fireMapChanged(k.getOwner(), k.getKey(), old.getValue(), null, true); } } } } void doLockExpiration(List list) { - if (this.started.get() && this.electionFinished.get() - && isCoordinator()) { + if (this.started.get() && this.electionFinished.get() && isCoordinator()) { for (EntryKey k : list) { EntryMessage entryMsg = new EntryMessage(); entryMsg.setType(EntryMessage.MessageType.DELETE); @@ -1530,8 +1441,7 @@ public class Group implements Map, Service { void sendHeartBeat(Destination destination) { if (this.started.get()) { try { - ObjectMessage msg = this.stateSession - .createObjectMessage(this.local); + ObjectMessage msg = this.stateSession.createObjectMessage(this.local); msg.setJMSType(STATE_TYPE); this.stateProducer.send(destination, msg); } catch (javax.jms.IllegalStateException e) { @@ -1548,8 +1458,7 @@ public class Group implements Map, Service { List>> list = new ArrayList>>(); synchronized (this.mapMutex) { if (this.localMap != null) { - for (Map.Entry> entry : this.localMap - .entrySet()) { + for (Map.Entry> entry : this.localMap.entrySet()) { list.add(entry); } } @@ -1561,12 +1470,10 @@ public class Group implements Map, Service { entryMsg.setValue(entry.getValue().getValue()); entryMsg.setType(EntryMessage.MessageType.SYNC); entryMsg.setMapUpdate(true); - ObjectMessage objMsg = this.stateSession - .createObjectMessage(entryMsg); + ObjectMessage objMsg = this.stateSession.createObjectMessage(entryMsg); if (!member.equals(entry.getValue().getKey().getOwner())) { objMsg.setJMSType(STATE_TYPE); - this.stateProducer.send(member.getInBoxDestination(), - objMsg); + this.stateProducer.send(member.getInBoxDestination(), objMsg); } } } catch (javax.jms.IllegalStateException e) { @@ -1615,16 +1522,13 @@ public class Group implements Map, Service { synchronized (this.mapMutex) { value = this.localMap.remove(entryKey); } - fireMapChanged(member, entryKey.getKey(), value.getValue(), - null, false); + fireMapChanged(member, entryKey.getKey(), value.getValue(), null, false); } } } - void fireMemberMessage(final Member member, final String replyId, - final Object message) { - if (this.started.get() && this.stateExecutor != null - && !this.messageExecutor.isShutdown()) { + void fireMemberMessage(final Member member, final String replyId, final Object message) { + if (this.started.get() && this.stateExecutor != null && !this.messageExecutor.isShutdown()) { this.messageExecutor.execute(new Runnable() { public void run() { doFireMemberMessage(member, replyId, message); @@ -1641,10 +1545,9 @@ public class Group implements Map, Service { } } - void fireMapChanged(final Member owner, final Object key, - final Object oldValue, final Object newValue, final boolean expired) { - if (this.started.get() && this.stateExecutor != null - && !this.stateExecutor.isShutdown()) { + void fireMapChanged(final Member owner, final Object key, final Object oldValue, final Object newValue, + final boolean expired) { + if (this.started.get() && this.stateExecutor != null && !this.stateExecutor.isShutdown()) { this.stateExecutor.execute(new Runnable() { public void run() { doFireMapChanged(owner, key, oldValue, newValue, expired); @@ -1653,8 +1556,7 @@ public class Group implements Map, Service { } } - void doFireMapChanged(Member owner, Object key, Object oldValue, - Object newValue, boolean expired) { + void doFireMapChanged(Member owner, Object key, Object oldValue, Object newValue, boolean expired) { if (this.started.get()) { for (GroupStateChangedListener l : this.mapChangedListeners) { if (oldValue == null) { @@ -1670,28 +1572,24 @@ public class Group implements Map, Service { void checkStatus() throws IllegalStateException { if (!started.get()) { - throw new IllegalStateException("GroupMap " + this.local.getName() - + " not started"); + throw new IllegalStateException("GroupMap " + this.local.getName() + " not started"); } waitForElection(); } public String toString() { - return "Group:" + getName() + "{id=" + this.local.getId() - + ",coordinator=" + isCoordinator() + ",inbox=" + return "Group:" + getName() + "{id=" + this.local.getId() + ",coordinator=" + isCoordinator() + ",inbox=" + this.local.getInBoxDestination() + "}"; } void election(final Member member, final boolean memberStarted) { - if (this.started.get() && this.stateExecutor != null - && !this.electionExecutor.isShutdown()) { + if (this.started.get() && this.electionExecutor != null && !this.electionExecutor.isShutdown()) { synchronized (this.electionFinished) { this.electionFinished.set(false); } synchronized (this.electionExecutor) { // remove any queued election tasks - List list = new ArrayList( - this.electionExecutor.getQueue()); + List list = new ArrayList(this.electionExecutor.getQueue()); for (Runnable r : list) { ElectionService es = (ElectionService) r; es.stop(); @@ -1723,8 +1621,7 @@ public class Group implements Map, Service { return result; } - void processElectionMessage(ElectionMessage msg, Destination replyTo, - String correlationId) { + void processElectionMessage(ElectionMessage msg, Destination replyTo, String correlationId) { if (msg.isElection()) { msg.setType(ElectionMessage.MessageType.ANSWER); msg.setMember(this.local); @@ -1745,16 +1642,14 @@ public class Group implements Map, Service { ElectionMessage msg = new ElectionMessage(); msg.setMember(this.local); msg.setType(type); - ObjectMessage objMsg = this.stateSession - .createObjectMessage(msg); + ObjectMessage objMsg = this.stateSession.createObjectMessage(msg); objMsg.setJMSType(STATE_TYPE); this.stateProducer.send(this.stateTopic, objMsg); } catch (javax.jms.IllegalStateException e) { // ignore - we are stopping } catch (JMSException e) { if (this.started.get()) { - LOG.error("Failed to broadcast election message: " + type, - e); + LOG.error("Failed to broadcast election message: " + type, e); } } } @@ -1795,30 +1690,26 @@ public class Group implements Map, Service { } void doElection() { - if ((this.member == null || (!this.member.equals(Group.this.local) || Group.this.members - .size() == getMinimumGroupSize()))) { + if ((this.member == null || (!this.member.equals(Group.this.local) || Group.this.members.size() == getMinimumGroupSize()))) { boolean wasCoordinator = isCoordinatorMatch() && !isEmpty(); // call an election while (!callElection() && isStarted() && this.started.get()) ; if (isStarted() && this.started.get()) { - List members = new ArrayList( - Group.this.members.values()); + List members = new ArrayList(Group.this.members.values()); Group.this.coordinator = selectCordinator(members); if (isCoordinatorMatch()) { broadcastElectionType(ElectionMessage.MessageType.COORDINATOR); } if (this.memberStarted && this.member != null) { - if (wasCoordinator || isCoordinator() - && this.started.get()) { + if (wasCoordinator || isCoordinator() && this.started.get()) { updateNewMemberMap(this.member); } } if (!isElectionFinished() && this.started.get()) { try { synchronized (Group.this.electionFinished) { - Group.this.electionFinished - .wait(Group.this.heartBeatInterval * 2); + Group.this.electionFinished.wait(Group.this.heartBeatInterval * 2); } } catch (InterruptedException e) { }