From 17034f2b04c15ad9d92666515f946421c1a27f67 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 13 Aug 2008 18:15:47 +0000 Subject: [PATCH] group moving from trunk to the sandbox git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@685632 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/group/AsyncMapRequest.java | 61 - .../group/DefaultMapChangedListener.java | 33 - .../activemq/group/ElectionMessage.java | 105 - .../org/apache/activemq/group/EntryKey.java | 210 -- .../apache/activemq/group/EntryMessage.java | 204 -- .../org/apache/activemq/group/EntryValue.java | 70 - .../java/org/apache/activemq/group/Group.java | 1742 ----------------- .../group/GroupMapUpdateException.java | 32 - .../activemq/group/GroupMessageListener.java | 34 - .../activemq/group/MapChangedListener.java | 50 - .../org/apache/activemq/group/MapRequest.java | 62 - .../org/apache/activemq/group/Member.java | 162 -- .../activemq/group/MemberChangedListener.java | 37 - .../activemq/group/RequestCallback.java | 30 - .../org/apache/activemq/group/package.html | 25 - 15 files changed, 2857 deletions(-) delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/Group.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/Member.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java delete mode 100755 activemq-core/src/main/java/org/apache/activemq/group/package.html diff --git a/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java b/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java deleted file mode 100644 index 05c90ed23d..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -import java.util.HashSet; -import java.util.Set; - -/** - * Return information about map update - * - */ -public class AsyncMapRequest implements RequestCallback{ - private final Object mutex = new Object(); - - private Set requests = new HashSet(); - - public void add(String id, MapRequest request) { - request.setCallback(this); - this.requests.add(id); - } - - /** - * Wait for requests - * @param timeout - * @return - */ - public boolean isSuccess(long timeout) { - long deadline = System.currentTimeMillis() + timeout; - while (!this.requests.isEmpty()) { - synchronized (this.mutex) { - try { - this.mutex.wait(timeout); - } catch (InterruptedException e) { - break; - } - } - timeout = Math.max(deadline - System.currentTimeMillis(), 0); - } - return this.requests.isEmpty(); - } - - - public void finished(String id) { - this.requests.remove(id); - - } -} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java b/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java deleted file mode 100644 index eecfa0c8b7..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -/** - * Default implementation of a MapChangedListener - * - */ -public class DefaultMapChangedListener implements MapChangedListener{ - - public void mapInsert(Member owner, Object key, Object value) { - } - - public void mapRemove(Member owner, Object key, Object value,boolean expired) { - } - - public void mapUpdate(Member owner, Object key, Object oldValue,Object newValue) { - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java b/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java deleted file mode 100644 index 497bdececc..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; - -/** - * Used to pass information around - * - */ -public class ElectionMessage implements Externalizable{ - static enum MessageType{ELECTION,ANSWER,COORDINATOR}; - private Member member; - private MessageType type; - - /** - * @return the member - */ - public Member getMember() { - return this.member; - } - - /** - * @param member the member to set - */ - public void setMember(Member member) { - this.member = member; - } - - /** - * @return the type - */ - public MessageType getType() { - return this.type; - } - - /** - * @param type the type to set - */ - public void setType(MessageType type) { - this.type = type; - } - - /** - * @return true if election message - */ - public boolean isElection() { - return this.type != null && this.type.equals(MessageType.ELECTION); - } - - /** - * @return true if answer message - */ - public boolean isAnswer() { - return this.type != null && this.type.equals(MessageType.ANSWER); - } - - /** - * @return true if coordinator message - */ - public boolean isCoordinator() { - return this.type != null && this.type.equals(MessageType.COORDINATOR); - } - - - public ElectionMessage copy() { - ElectionMessage result = new ElectionMessage(); - result.member=this.member; - result.type=this.type; - return result; - } - - - public void readExternal(ObjectInput in) throws IOException, - ClassNotFoundException { - this.member=(Member) in.readObject(); - this.type=(MessageType) in.readObject(); - } - - public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(this.member); - out.writeObject(this.type); - } - - public String toString() { - return "ElectionMessage: "+ this.member + "{"+this.type+ "}"; - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java b/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java deleted file mode 100644 index 996277e105..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; - -/** - * Holds information about an EntryKey - * - */ -class EntryKey implements Externalizable { - private Member owner; - private K key; - private boolean locked; - private boolean removeOnExit; - private boolean releaseLockOnExit; - private long expiration; - private long lockExpiration; - - /** - * Default constructor - for serialization - */ - public EntryKey() { - } - - EntryKey(Member owner, K key) { - this.owner = owner; - this.key = key; - } - - public int hashCode() { - return this.key != null ? this.key.hashCode() : super.hashCode(); - } - - /** - * @return the owner - */ - public Member getOwner() { - return this.owner; - } - - public void setOwner(Member member) { - this.owner=member; - } - - /** - * @return the key - */ - public K getKey() { - return this.key; - } - - /** - * @return the share - */ - public boolean isLocked() { - return this.locked; - } - - /** - * @param share the share to set - */ - public void setLocked(boolean locked) { - this.locked = locked; - } - - /** - * @return the removeOnExit - */ - public boolean isRemoveOnExit() { - return this.removeOnExit; - } - - /** - * @param removeOnExit - * the removeOnExit to set - */ - public void setRemoveOnExit(boolean removeOnExit) { - this.removeOnExit = removeOnExit; - } - - /** - * @return the expiration - */ - public long getExpiration() { - return expiration; - } - - /** - * @param expiration the expiration to set - */ - public void setExpiration(long expiration) { - this.expiration = expiration; - } - - /** - * @return the lockExpiration - */ - public long getLockExpiration() { - return lockExpiration; - } - - /** - * @param lockExpiration the lockExpiration to set - */ - public void setLockExpiration(long lockExpiration) { - this.lockExpiration = lockExpiration; - } - - /** - * @return the releaseLockOnExit - */ - public boolean isReleaseLockOnExit() { - return releaseLockOnExit; - } - - /** - * @param releaseLockOnExit the releaseLockOnExit to set - */ - public void setReleaseLockOnExit(boolean releaseLockOnExit) { - this.releaseLockOnExit = releaseLockOnExit; - } - - void setTimeToLive(long ttl) { - if (ttl > 0 ) { - this.expiration=ttl+System.currentTimeMillis(); - }else { - this.expiration =0l; - } - } - - void setLockTimeToLive(long ttl) { - if(ttl > 0) { - this.lockExpiration=ttl+System.currentTimeMillis(); - }else { - this.lockExpiration=0l; - } - } - - boolean isExpired() { - return isExpired(System.currentTimeMillis()); - } - - boolean isExpired(long currentTime) { - return this.expiration > 0 && this.expiration < currentTime; - } - - boolean isLockExpired() { - return isLockExpired(System.currentTimeMillis()); - } - - boolean isLockExpired(long currentTime) { - return this.lockExpiration > 0 && this.lockExpiration < currentTime; - } - - - - public boolean equals(Object obj) { - boolean result = false; - if (obj instanceof EntryKey) { - EntryKey other = (EntryKey) obj; - result = other.key.equals(this.key); - } - return result; - } - - public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(this.owner); - out.writeObject(this.key); - out.writeBoolean(isLocked()); - out.writeBoolean(isRemoveOnExit()); - out.writeBoolean(isReleaseLockOnExit()); - out.writeLong(getExpiration()); - out.writeLong(getLockExpiration()); - } - - public void readExternal(ObjectInput in) throws IOException, - ClassNotFoundException { - this.owner = (Member) in.readObject(); - this.key = (K) in.readObject(); - this.locked = in.readBoolean(); - this.removeOnExit=in.readBoolean(); - this.releaseLockOnExit=in.readBoolean(); - this.expiration=in.readLong(); - this.lockExpiration=in.readLong(); - } - - public String toString() { - return "key:"+this.key; - } - - -} diff --git a/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java b/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java deleted file mode 100644 index ef569762f5..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; - -/** - * Used to pass information around - * - */ -public class EntryMessage implements Externalizable{ - static enum MessageType{INSERT,DELETE,SYNC}; - private EntryKey key; - private Object value; - private Object oldValue; - private MessageType type; - private boolean mapUpdate; - private boolean expired; - private boolean lockExpired; - private boolean lockUpdate; - - /** - * @return the owner - */ - public EntryKey getKey() { - return this.key; - } - /** - * @param key - */ - public void setKey(EntryKey key) { - this.key = key; - } - /** - * @return the value - */ - public Object getValue() { - return this.value; - } - /** - * @param value the value to set - */ - public void setValue(Object value) { - this.value = value; - } - - /** - * @return the oldValue - */ - public Object getOldValue() { - return this.oldValue; - } - /** - * @param oldValue the oldValue to set - */ - public void setOldValue(Object oldValue) { - this.oldValue = oldValue; - } - - /** - * @return the type - */ - public MessageType getType() { - return this.type; - } - /** - * @param type the type to set - */ - public void setType(MessageType type) { - this.type = type; - } - - /** - * @return the mapUpdate - */ - public boolean isMapUpdate() { - return this.mapUpdate; - } - /** - * @param mapUpdate the mapUpdate to set - */ - public void setMapUpdate(boolean mapUpdate) { - this.mapUpdate = mapUpdate; - } - - /** - * @return the expired - */ - public boolean isExpired() { - return expired; - } - /** - * @param expired the expired to set - */ - public void setExpired(boolean expired) { - this.expired = expired; - } - - /** - * @return the lockExpired - */ - public boolean isLockExpired() { - return lockExpired; - } - /** - * @param lockExpired the lockExpired to set - */ - public void setLockExpired(boolean lockExpired) { - this.lockExpired = lockExpired; - } - - /** - * @return the lockUpdate - */ - public boolean isLockUpdate() { - return lockUpdate; - } - /** - * @param lockUpdate the lockUpdate to set - */ - public void setLockUpdate(boolean lockUpdate) { - this.lockUpdate = lockUpdate; - } - - /** - * @return if insert message - */ - public boolean isInsert() { - return this.type != null && this.type.equals(MessageType.INSERT); - } - - /** - * @return true if delete message - */ - public boolean isDelete() { - return this.type != null && this.type.equals(MessageType.DELETE); - } - - public boolean isSync() { - return this.type != null && this.type.equals(MessageType.SYNC); - } - - - public EntryMessage copy() { - EntryMessage result = new EntryMessage(); - result.key=this.key; - result.value=this.value; - result.oldValue=this.oldValue; - result.type=this.type; - result.mapUpdate=this.mapUpdate; - result.expired=this.expired; - result.lockExpired=this.lockExpired; - result.lockUpdate=this.lockUpdate; - return result; - } - - - - public void readExternal(ObjectInput in) throws IOException, - ClassNotFoundException { - this.key=(EntryKey) in.readObject(); - this.value=in.readObject(); - this.oldValue=in.readObject(); - this.type=(MessageType) in.readObject(); - this.mapUpdate=in.readBoolean(); - this.expired=in.readBoolean(); - this.lockExpired=in.readBoolean(); - this.lockUpdate=in.readBoolean(); - } - - public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(this.key); - out.writeObject(this.value); - out.writeObject(this.oldValue); - out.writeObject(this.type); - out.writeBoolean(this.mapUpdate); - out.writeBoolean(this.expired); - out.writeBoolean(this.lockExpired); - out.writeBoolean(this.lockUpdate); - } - - public String toString() { - return "EntryMessage: "+this.type + "[" + this.key + "," + this.value + - "]{update=" + this.mapUpdate + "}"; - } - -} diff --git a/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java b/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java deleted file mode 100644 index 791f67d360..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY VIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - - -/** - * Holds information about the Value in the Map - * - */ -class EntryValue { - private EntryKey key; - private V value; - - - EntryValue(EntryKey key, V value){ - this.key=key; - this.value=value; - } - - /** - * @return the owner - */ - public EntryKey getKey() { - return this.key; - } - - /** - * @return the key - */ - public V getValue() { - return this.value; - } - - /** - * set the value - * @param value - */ - public void setValue(V value) { - this.value=value; - } - - public int hashCode() { - return this.value != null ? this.value.hashCode() : super.hashCode(); - } - - public boolean equals(Object obj) { - boolean result = false; - if (obj instanceof EntryValue) { - EntryValue other = (EntryValue)obj; - result = (this.value==null && other.value==null) || - (this.value != null && other.value != null && this.value.equals(other.value)); - } - return result; - } -} - diff --git a/activemq-core/src/main/java/org/apache/activemq/group/Group.java b/activemq-core/src/main/java/org/apache/activemq/group/Group.java deleted file mode 100644 index 416c5405fd..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/Group.java +++ /dev/null @@ -1,1742 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.Topic; -import org.apache.activemq.ActiveMQMessageConsumer; -import org.apache.activemq.Service; -import org.apache.activemq.advisory.ConsumerEvent; -import org.apache.activemq.advisory.ConsumerEventSource; -import org.apache.activemq.advisory.ConsumerListener; -import org.apache.activemq.thread.SchedulerTimerTask; -import org.apache.activemq.util.IdGenerator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - *

- * A Group is a distributed collaboration implementation that is - * used to shared state and process messages amongst a distributed group of - * other Group instances. - * Membership of a group is handled automatically using discovery. - *

- * The underlying transport is JMS and there are some optimizations that occur - * for membership if used with ActiveMQ - but Group can be used - * with any JMS implementation. - * - *

- * Updates to the group shared map are controlled by a coordinator. The - * coordinator is elected by the member with the lowest lexicographical id - - * based on the bully algorithm [Silberschatz et al. 1993] - *

- * The {@link #selectCordinator(Collection members)} method may be - * overridden to implement a custom mechanism for choosing how the coordinator - * is elected for the map. - *

- * New Group instances have their state updated by the coordinator, - * and coordinator failure is handled automatically within the group. - *

- * All map updates are totally ordered through the coordinator, whilst read operations - * happen locally. - *

- * A Group supports the concept of owner only updates(write locks), - * shared updates, entry expiration times and removal on owner exit - - * all of which are optional. In addition, you can grab and release locks for - * values in the map, independently of who created them. - *

- * In addition, members of a group can broadcast messages and implement request/response - * with other Group instances. - * - *

- * - * @param the key type - * @param the value type - * - */ -public class Group implements Map, Service { - /** - * default interval within which to detect a member failure - */ - public static final long DEFAULT_HEART_BEAT_INTERVAL = 2000; - private static final long EXPIRATION_SWEEP_INTERVAL = 500; - private static final Log LOG = LogFactory.getLog(Group.class); - private static final String STATE_PREFIX = "STATE."+Group.class.getName()+ "."; - private static final String GROUP_MESSAGE_PREFIX = "MESSAGE."+Group.class.getName()+ "."; - private static final String STATE_TYPE = "state"; - private static final String MESSAGE_TYPE = "message"; - private static final String MEMBER_ID_PROPERTY="memberId"; - protected Member local; - private final Object mapMutex = new Object(); - private Map> localMap; - private Map members = new ConcurrentHashMap(); - private Map stateRequests = new HashMap(); - private Map messageRequests = new HashMap(); - private List membershipListeners = new CopyOnWriteArrayList(); - private List mapChangedListeners = new CopyOnWriteArrayList(); - private List groupMessageListeners = new CopyOnWriteArrayList(); - - private Member coordinator; - private String groupName; - private boolean alwaysLock; - private Connection connection; - private Session stateSession; - private Session messageSession; - private Topic stateTopic; - private Topic heartBeatTopic; - private Topic inboxTopic; - private Topic messageTopic; - private Queue messageQueue; - private MessageProducer stateProducer; - private MessageProducer messageProducer; - private ConsumerEventSource consumerEvents; - private AtomicBoolean started = new AtomicBoolean(); - private SchedulerTimerTask heartBeatTask; - private SchedulerTimerTask checkMembershipTask; - private SchedulerTimerTask expirationTask; - private Timer timer; - private long heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL; - private IdGenerator idGenerator = new IdGenerator(); - private boolean removeOwnedObjectsOnExit; - private boolean releaseLockOnExit=true; - private int timeToLive; - private int lockTimeToLive; - private int minimumGroupSize = 1; - private int coordinatorWeight=0; - private final AtomicBoolean electionFinished = new AtomicBoolean(true); - private ExecutorService stateExecutor; - private ExecutorService messageExecutor; - private ThreadPoolExecutor electionExecutor; - private final Object memberMutex = new Object(); - - /** - * @param connection - * @param name - */ - public Group(Connection connection, String name) { - this(connection, "default", name); - } - - /** - * @param connection - * @param groupName - * @param name - */ - public Group(Connection connection, String groupName, String name) { - this.connection = connection; - this.local = new Member(name); - this.coordinator = this.local; - this.groupName = groupName; - } - - /** - * Set the local map implementation to be used By default its a HashMap - - * but you could use a Cache for example - * - * @param map - */ - public void setLocalMap(Map map) { - synchronized (this.mapMutex) { - this.localMap = map; - } - } - - /** - * Start membership to the group - * - * @throws Exception - * - */ - public void start() throws Exception { - if (this.started.compareAndSet(false, true)) { - synchronized (this.mapMutex) { - if (this.localMap == null) { - this.localMap = new HashMap>(); - } - } - this.connection.start(); - this.stateSession = this.connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); - this.messageSession = this.connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); - this.stateProducer = this.stateSession.createProducer(null); - this.stateProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - this.inboxTopic = this.stateSession.createTemporaryTopic(); - - String stateTopicName = STATE_PREFIX + this.groupName; - this.stateTopic = this.stateSession.createTopic(stateTopicName); - this.heartBeatTopic = this.stateSession.createTopic(stateTopicName - + ".heartbeat"); - - String messageDestinationName = GROUP_MESSAGE_PREFIX+ this.groupName; - this.messageTopic = this.messageSession.createTopic(messageDestinationName); - this.messageQueue = this.messageSession.createQueue(messageDestinationName); - MessageConsumer privateInbox = this.messageSession - .createConsumer(this.inboxTopic); - - - - privateInbox.setMessageListener(new MessageListener() { - public void onMessage(Message message) { - processJMSMessage(message); - } - }); - MessageConsumer mapChangeConsumer = this.stateSession - .createConsumer(this.stateTopic); - String memberId = null; - if (mapChangeConsumer instanceof ActiveMQMessageConsumer) { - memberId = ((ActiveMQMessageConsumer) mapChangeConsumer) - .getConsumerId().toString(); - } else { - memberId = this.idGenerator.generateId(); - } - this.local.setId(memberId); - this.local.setInBoxDestination(this.inboxTopic); - this.local.setCoordinatorWeight(getCoordinatorWeight()); - mapChangeConsumer.setMessageListener(new MessageListener() { - public void onMessage(Message message) { - processJMSMessage(message); - } - }); - - this.messageProducer = this.messageSession.createProducer(null); - this.messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageConsumer topicMessageConsumer = this.messageSession.createConsumer(this.messageTopic); - topicMessageConsumer.setMessageListener(new MessageListener() { - public void onMessage(Message message) { - processJMSMessage(message); - } - }); - - MessageConsumer queueMessageConsumer = this.messageSession.createConsumer(this.messageQueue); - queueMessageConsumer.setMessageListener(new MessageListener() { - public void onMessage(Message message) { - processJMSMessage(message); - } - }); - - - MessageConsumer heartBeatConsumer = this.stateSession - .createConsumer(this.heartBeatTopic); - heartBeatConsumer.setMessageListener(new MessageListener() { - public void onMessage(Message message) { - handleHeartbeats(message); - } - }); - this.consumerEvents = new ConsumerEventSource(this.connection, - this.stateTopic); - this.consumerEvents.setConsumerListener(new ConsumerListener() { - public void onConsumerEvent(ConsumerEvent event) { - handleConsumerEvents(event); - } - }); - this.consumerEvents.start(); - - this.electionExecutor = new ThreadPoolExecutor(1, 1, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(), - new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Election{" - + Group.this.local + "}"); - thread.setDaemon(true); - return thread; - } - }); - - this.stateExecutor = Executors - .newSingleThreadExecutor(new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Group State{" - + Group.this.local + "}"); - thread.setDaemon(true); - return thread; - } - }); - - this.messageExecutor = Executors - .newSingleThreadExecutor(new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Group Messages{" - + Group.this.local + "}"); - thread.setDaemon(true); - return thread; - } - }); - sendHeartBeat(); - this.heartBeatTask = new SchedulerTimerTask(new Runnable() { - public void run() { - sendHeartBeat(); - } - }); - this.checkMembershipTask = new SchedulerTimerTask(new Runnable() { - public void run() { - checkMembership(); - } - }); - - this.expirationTask = new SchedulerTimerTask(new Runnable() { - public void run() { - expirationSweep(); - } - }); - this.timer = new Timer("Distributed heart beat", true); - this.timer.scheduleAtFixedRate(this.heartBeatTask, - getHeartBeatInterval() / 3, getHeartBeatInterval() / 2); - this.timer.scheduleAtFixedRate(this.checkMembershipTask, - getHeartBeatInterval(), getHeartBeatInterval()); - this.timer.scheduleAtFixedRate(this.expirationTask, - EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL); - // await for members to join - long timeout = this.heartBeatInterval * this.minimumGroupSize; - long deadline = System.currentTimeMillis() + timeout; - while ((this.members.size() < this.minimumGroupSize || !this.electionFinished - .get()) - && timeout > 0) { - synchronized (this.memberMutex) { - this.memberMutex.wait(timeout); - } - timeout = Math.max(deadline - System.currentTimeMillis(), 0); - } - } - } - - /** - * stop membership to the group - * - * @throws Exception - */ - public void stop() { - if (this.started.compareAndSet(true, false)) { - this.expirationTask.cancel(); - this.checkMembershipTask.cancel(); - this.heartBeatTask.cancel(); - this.expirationTask.cancel(); - this.timer.purge(); - if (this.electionExecutor != null) { - this.electionExecutor.shutdownNow(); - } - if (this.stateExecutor != null) { - this.stateExecutor.shutdownNow(); - } - if(this.messageExecutor != null) { - this.messageExecutor.shutdownNow(); - } - try { - this.consumerEvents.stop(); - this.stateSession.close(); - this.messageSession.close(); - } catch (Exception e) { - LOG.debug("Caught exception stopping", e); - } - } - } - - /** - * @return true if there is elections have finished - */ - public boolean isElectionFinished() { - return this.electionFinished.get(); - } - - /** - * @return the partitionName - */ - public String getGroupName() { - return this.groupName; - } - - /** - * @return the name ofthis map - */ - public String getName() { - return this.local.getName(); - } - - /** - * @return true if by default always lock objects (default is false) - */ - public boolean isAlwaysLock() { - return this.alwaysLock; - } - - /** - * @param alwaysLock - set true if objects inserted will always - * be locked (default is false) - */ - public void setAlwaysLock(boolean alwaysLock) { - this.alwaysLock = alwaysLock; - } - - /** - * @return the heartBeatInterval - */ - public long getHeartBeatInterval() { - return this.heartBeatInterval; - } - - /** - * @param heartBeatInterval - * the heartBeatInterval to set - */ - public void setHeartBeatInterval(long heartBeatInterval) { - this.heartBeatInterval = heartBeatInterval; - } - - /** - * Add a listener for membership changes - * @param l - */ - public void addMemberChangedListener(MemberChangedListener l) { - this.membershipListeners.add(l); - } - - /** - * Remove a listener for membership changes - * @param l - */ - public void removeMemberChangedListener(MemberChangedListener l) { - this.membershipListeners.remove(l); - } - - /** - * Add a listener for map changes - * @param l - */ - public void addMapChangedListener(MapChangedListener l) { - this.mapChangedListeners.add(l); - } - - /** - * Remove a listener for map changes - * @param l - */ - public void removeMapChangedListener(MapChangedListener l) { - this.mapChangedListeners.remove(l); - } - - /** - * Add a listener for group messages - * @param l - */ - public void addGroupMessageListener(GroupMessageListener l) { - this.groupMessageListeners.add(l); - } - - - /** - * remove a listener for group messages - * @param l - */ - public void removeGroupMessageListener(GroupMessageListener l) { - this.groupMessageListeners.remove(l); - } - - - /** - * @return the timeToLive - */ - public int getTimeToLive() { - return this.timeToLive; - } - - /** - * @param timeToLive - * the timeToLive to set - */ - public void setTimeToLive(int timeToLive) { - this.timeToLive = timeToLive; - } - - /** - * @return the removeOwnedObjectsOnExit - */ - public boolean isRemoveOwnedObjectsOnExit() { - return this.removeOwnedObjectsOnExit; - } - - /** - * Sets the policy for owned objects in the group If set to true, when this - * GroupMap stops, - * any objects it owns will be removed from the group map - * @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set - */ - public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) { - this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit; - } - - /** - * @return releaseLockOnExit - true by default - */ - public boolean isReleaseLockOnExit() { - return releaseLockOnExit; - } - - /** - * set release lock on exit - true by default - * @param releaseLockOnExit the releaseLockOnExit to set - */ - public void setReleaseLockOnExit(boolean releaseLockOnExit) { - this.releaseLockOnExit = releaseLockOnExit; - } - - /** - * @return the lockTimeToLive - */ - public int getLockTimeToLive() { - return lockTimeToLive; - } - - /** - * @param lockTimeToLive the lockTimeToLive to set - */ - public void setLockTimeToLive(int lockTimeToLive) { - this.lockTimeToLive = lockTimeToLive; - } - - /** - * @return the minimumGroupSize - */ - public int getMinimumGroupSize() { - return this.minimumGroupSize; - } - - /** - * @param minimumGroupSize - * the minimumGroupSize to set - */ - public void setMinimumGroupSize(int minimumGroupSize) { - this.minimumGroupSize = minimumGroupSize; - } - - /** - * @return the coordinatorWeight - */ - public int getCoordinatorWeight() { - return this.coordinatorWeight; - } - /** - * @param coordinatorWeight the coordinatorWeight to set - */ - public void setCoordinatorWeight(int coordinatorWeight) { - this.coordinatorWeight = coordinatorWeight; - } - - /** - * clear entries from the Map - * - * @throws IllegalStateException - */ - public void clear() throws IllegalStateException { - checkStatus(); - if (this.localMap != null && !this.localMap.isEmpty()) { - Set keys = null; - synchronized (this.mapMutex) { - keys = new HashSet(this.localMap.keySet()); - } - for (K key : keys) { - remove(key); - } - } - this.localMap.clear(); - } - - public boolean containsKey(Object key) { - synchronized (this.mapMutex) { - return this.localMap != null ? this.localMap.containsKey(key) - : false; - } - } - - public boolean containsValue(Object value) { - EntryValue entryValue = new EntryValue(null, value); - synchronized (this.mapMutex) { - return this.localMap != null ? this.localMap - .containsValue(entryValue) : false; - } - } - - public Set> entrySet() { - Map result = new HashMap(); - synchronized (this.mapMutex) { - if (this.localMap != null) { - for (EntryValue entry : this.localMap.values()) { - result.put((K) entry.getKey(), entry.getValue()); - } - } - } - return result.entrySet(); - } - - public V get(Object key) { - EntryValue value = null; - synchronized (this.mapMutex) { - value = this.localMap != null ? this.localMap.get(key) : null; - } - return value != null ? value.getValue() : null; - } - - public boolean isEmpty() { - synchronized (this.mapMutex) { - return this.localMap != null ? this.localMap.isEmpty() : true; - } - } - - public Set keySet() { - Set result = null; - synchronized(this.mapMutex) { - result = new HashSet(this.localMap.keySet()); - } - return result; - } - - /** - * Puts an value into the map associated with the key - * - * @param key - * @param value - * @return the old value or null - * @throws GroupMapUpdateException - * @throws IllegalStateException - * - */ - public V put(K key, V value) throws GroupMapUpdateException, - IllegalStateException { - return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),isReleaseLockOnExit(), - getTimeToLive(),getLockTimeToLive()); - } - - /** - * Puts an value into the map associated with the key - * - * @param key - * @param value - * @param lock - * @param removeOnExit - * @param releaseLockOnExit - * @param timeToLive - * @param lockTimeToLive - * @return the old value or null - * @throws GroupMapUpdateException - * @throws IllegalStateException - * - */ - public V put(K key, V value, boolean lock, boolean removeOnExit,boolean releaseLockOnExit, - long timeToLive,long lockTimeToLive) throws GroupMapUpdateException, - IllegalStateException { - checkStatus(); - EntryKey entryKey = new EntryKey(this.local, key); - entryKey.setLocked(lock); - entryKey.setRemoveOnExit(removeOnExit); - entryKey.setReleaseLockOnExit(releaseLockOnExit); - entryKey.setTimeToLive(timeToLive); - entryKey.setLockTimeToLive(lockTimeToLive); - EntryMessage entryMsg = new EntryMessage(); - entryMsg.setKey(entryKey); - entryMsg.setValue(value); - entryMsg.setType(EntryMessage.MessageType.INSERT); - return (V) sendStateRequest(getCoordinator(), entryMsg); - } - - /** - * Remove a lock on a key - * @param key - * @throws GroupMapUpdateException - */ - public void unlock(K key) throws GroupMapUpdateException{ - EntryKey entryKey = new EntryKey(this.local, key); - entryKey.setLocked(false); - EntryMessage entryMsg = new EntryMessage(); - entryMsg.setKey(entryKey); - entryMsg.setLockUpdate(true); - sendStateRequest(getCoordinator(), entryMsg); - } - - /** - * Lock a key in the distributed map - * @param key - * @throws GroupMapUpdateException - */ - public void lock(K key) throws GroupMapUpdateException{ - lock(key,getLockTimeToLive()); - } - - /** - * Lock a key in the distributed map - * @param key - * @param lockTimeToLive - * @throws GroupMapUpdateException - */ - public void lock(K key,long lockTimeToLive) throws GroupMapUpdateException{ - EntryKey entryKey = new EntryKey(this.local, key); - entryKey.setLocked(true); - entryKey.setLockTimeToLive(lockTimeToLive); - EntryMessage entryMsg = new EntryMessage(); - entryMsg.setKey(entryKey); - entryMsg.setLockUpdate(true); - sendStateRequest(getCoordinator(), entryMsg); - } - - /** - * Add the Map to the distribution - * - * @param t - * @throws GroupMapUpdateException - * @throws IllegalStateException - */ - public void putAll(Map t) - throws GroupMapUpdateException, IllegalStateException { - putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),isReleaseLockOnExit(), - getTimeToLive(),getLockTimeToLive()); - } - - /** - * Add the Map to the distribution - * - * @param t - * @param lock - * @param removeOnExit - * @param releaseLockOnExit - * @param timeToLive - * @param lockTimeToLive - * @throws GroupMapUpdateException - * @throws IllegalStateException - */ - public void putAll(Map t, boolean lock, boolean removeOnExit,boolean releaseLockOnExit, - long timeToLive,long lockTimeToLive) - throws GroupMapUpdateException, IllegalStateException { - for (java.util.Map.Entry entry : t.entrySet()) { - put(entry.getKey(), entry.getValue(), lock, removeOnExit, - releaseLockOnExit, timeToLive, lockTimeToLive); - } - } - - /** - * remove a value from the map associated with the key - * - * @param key - * @return the Value or null - * @throws GroupMapUpdateException - * @throws IllegalStateException - * - */ - public V remove(Object key) throws GroupMapUpdateException, - IllegalStateException { - EntryKey entryKey = new EntryKey(this.local, (K) key); - return doRemove(entryKey); - } - - V doRemove(EntryKey key) throws GroupMapUpdateException, - IllegalStateException { - checkStatus(); - EntryMessage entryMsg = new EntryMessage(); - entryMsg.setKey(key); - entryMsg.setType(EntryMessage.MessageType.DELETE); - return (V) sendStateRequest(getCoordinator(), entryMsg); - } - - public int size() { - synchronized (this.mapMutex) { - return this.localMap != null ? this.localMap.size() : 0; - } - } - - public Collection values() { - List result = new ArrayList(); - synchronized (this.mapMutex) { - if (this.localMap != null) { - for (EntryValue value : this.localMap.values()) { - result.add(value.getValue()); - } - } - } - return result; - } - - /** - * @return a set of the members - */ - public Set members() { - Set result = new HashSet(); - result.addAll(this.members.values()); - return result; - } - - /** - * Get a member by its unique id - * @param id - * @return - */ - public Member getMemberById(String id) { - return this.members.get(id); - } - - /** - * Return a member of the Group with the matching name - * @param name - * @return - */ - public Member getMemberByName(String name) { - if (name != null) { - for (Member member :this.members.values()) { - if(member.getName().equals(name)) { - return member; - } - } - } - return null; - } - - /** - * @return the local member that represents this Group instance - */ - public Member getLocalMember() { - return this.local; - } - - /** - * @param key - * @return true if this is the owner of the key - */ - public boolean isOwner(K key) { - EntryValue entryValue = null; - synchronized (this.mapMutex) { - entryValue = this.localMap != null ? this.localMap.get(key) - : null; - } - boolean result = false; - if (entryValue != null) { - result = entryValue.getKey().getOwner().getId().equals(this.local.getId()); - } - return result; - } - - /** - * Get the owner of a key - * - * @param key - * @return the owner - or null if the key doesn't exist - */ - EntryKey getKey(Object key) { - EntryValue entryValue = null; - synchronized (this.mapMutex) { - entryValue = this.localMap != null ? this.localMap.get(key) - : null; - } - return entryValue != null ? entryValue.getKey() : null; - } - - /** - * @return true if the coordinator for the map - */ - public boolean isCoordinator() { - return this.local.equals(this.coordinator); - } - - /** - * @return the coordinator - */ - public Member getCoordinator() { - return this.coordinator; - } - - /** - * Broadcast a message to the group - * @param message - * @throws JMSException - */ - public void broadcastMessage(Object message) throws JMSException { - checkStatus(); - ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message); - objMsg.setJMSCorrelationID(this.idGenerator.generateId()); - objMsg.setJMSType(MESSAGE_TYPE); - objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); - this.messageProducer.send(this.messageTopic, objMsg); - } - - /** - * As the group for a response - one will be selected from the group - * @param member - * @param message - * @param timeout in milliseconds - a value if 0 means wait until complete - * @return - * @throws JMSException - */ - public Serializable broadcastMessageRequest(Object message, long timeout) - throws JMSException { - checkStatus(); - Object result = null; - MapRequest request = new MapRequest(); - String id = this.idGenerator.generateId(); - synchronized (this.messageRequests) { - this.messageRequests.put(id, request); - } - ObjectMessage objMsg = this.stateSession - .createObjectMessage((Serializable) message); - objMsg.setJMSReplyTo(this.inboxTopic); - objMsg.setJMSCorrelationID(id); - objMsg.setJMSType(MESSAGE_TYPE); - objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); - this.messageProducer.send(this.messageQueue, objMsg); - result = request.get(timeout); - return (Serializable) result; - } - - /** - * Send a message to the group - but only the least loaded member will process it - * @param message - * @throws JMSException - */ - public void sendMessage(Object message) throws JMSException { - checkStatus(); - ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message); - objMsg.setJMSCorrelationID(this.idGenerator.generateId()); - objMsg.setJMSType(MESSAGE_TYPE); - objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); - this.messageProducer.send(this.messageQueue, objMsg); - } - - /** - * Send a message to an individual member - * @param member - * @param message - * @throws JMSException - */ - public void sendMessage(Member member, Object message) throws JMSException { - checkStatus(); - ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message); - objMsg.setJMSCorrelationID(this.idGenerator.generateId()); - objMsg.setJMSType(MESSAGE_TYPE); - objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); - this.messageProducer.send(member.getInBoxDestination(), objMsg); - } - - /** - * Send a request to a member - * @param member - * @param message - * @param timeout in milliseconds - a value if 0 means wait until complete - * @return the request or null - * @throws JMSException - */ - public Object sendMessageRequest(Member member, Object message,long timeout) throws JMSException { - checkStatus(); - Object result = null; - MapRequest request = new MapRequest(); - String id = this.idGenerator.generateId(); - synchronized (this.messageRequests) { - this.messageRequests.put(id, request); - } - ObjectMessage objMsg = this.stateSession - .createObjectMessage((Serializable) message); - objMsg.setJMSReplyTo(this.inboxTopic); - objMsg.setJMSCorrelationID(id); - objMsg.setJMSType(MESSAGE_TYPE); - objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); - this.messageProducer.send(member.getInBoxDestination(), objMsg); - result = request.get(timeout); - return result; - } - - /** - * send a response to a message - * @param member - * @param replyId - * @param message - * @throws JMSException - */ - public void sendMessageResponse(Member member,String replyId, Object message) throws JMSException { - checkStatus(); - ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message); - objMsg.setJMSCorrelationID(replyId); - objMsg.setJMSType(MESSAGE_TYPE); - objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId()); - this.messageProducer.send(member.getInBoxDestination(), objMsg); - } - - /** - * Select a coordinator - coordinator weighting is used - or if everything - * is equal - a comparison of member ids. - * - * @param members - * @return - */ - protected Member selectCordinator(List list) { - Collections.sort(list, new Comparator() { - public int compare(Member m1, Member m2) { - int result = m1.getCoordinatorWeight() - - m2.getCoordinatorWeight(); - if (result == 0) { - result = m1.getId().compareTo(m2.getId()); - } - return result; - } - }); - Member result = list.isEmpty() ? this.local : list.get(list.size()-1); - return result; - } - - Object sendStateRequest(Member member, Serializable payload) { - Object result = null; - MapRequest request = new MapRequest(); - String id = this.idGenerator.generateId(); - synchronized (this.stateRequests) { - this.stateRequests.put(id, request); - } - try { - ObjectMessage objMsg = this.stateSession.createObjectMessage(payload); - objMsg.setJMSReplyTo(this.inboxTopic); - objMsg.setJMSCorrelationID(id); - objMsg.setJMSType(STATE_TYPE); - this.stateProducer.send(member.getInBoxDestination(), objMsg); - result = request.get(getHeartBeatInterval()); - } catch (JMSException e) { - if (this.started.get()) { - LOG.error("Failed to send request " + payload, e); - } - } - if (result instanceof GroupMapUpdateException) { - throw (GroupMapUpdateException) result; - } - if (result instanceof EntryMessage) { - EntryMessage entryMsg = (EntryMessage) result; - result = entryMsg.getOldValue(); - } - return result; - } - - void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member, - Serializable payload) { - MapRequest request = new MapRequest(); - String id = this.idGenerator.generateId(); - asyncRequest.add(id, request); - synchronized (this.stateRequests) { - this.stateRequests.put(id, request); - } - try { - ObjectMessage objMsg = this.stateSession.createObjectMessage(payload); - objMsg.setJMSReplyTo(this.inboxTopic); - objMsg.setJMSCorrelationID(id); - objMsg.setJMSType(STATE_TYPE); - this.stateProducer.send(member.getInBoxDestination(), objMsg); - } catch (JMSException e) { - if (this.started.get()) { - LOG.error("Failed to send async request " + payload, e); - } - } - } - - void sendReply(Object reply, Destination replyTo, String id) { - if (this.started.get()) { - if (replyTo != null) { - if (replyTo.equals(this.local.getInBoxDestination())) { - processRequest(id, reply); - } else { - try { - ObjectMessage replyMsg = this.stateSession - .createObjectMessage((Serializable) reply); - replyMsg.setJMSCorrelationID(id); - replyMsg.setJMSType(STATE_TYPE); - this.stateProducer.send(replyTo, replyMsg); - } catch (JMSException e) { - LOG.error("Couldn't send reply from co-ordinator", e); - } - } - } else { - LOG.error("NULL replyTo destination"); - } - } - } - - void broadcastMapUpdate(EntryMessage entry, String correlationId) { - if(this.started.get()) { - try { - EntryMessage copy = entry.copy(); - copy.setMapUpdate(true); - ObjectMessage objMsg = this.stateSession.createObjectMessage(copy); - objMsg.setJMSCorrelationID(correlationId); - objMsg.setJMSType(STATE_TYPE); - this.stateProducer.send(this.stateTopic, objMsg); - } catch (JMSException e) { - if (this.started.get()) { - LOG.error("Failed to send EntryMessage " + entry, e); - } - } - } - } - - - - void processJMSMessage(Message message) { - if (message instanceof ObjectMessage) { - ObjectMessage objMsg = (ObjectMessage) message; - - try { - String messageType = objMsg.getJMSType(); - String id = objMsg.getJMSCorrelationID(); - String memberId = objMsg.getStringProperty(MEMBER_ID_PROPERTY); - Destination replyTo = objMsg.getJMSReplyTo(); - Object payload = objMsg.getObject(); - if (messageType != null) { - if (messageType.equals(STATE_TYPE)) { - if (payload instanceof Member) { - handleHeartbeats((Member) payload); - } else if (payload instanceof EntryMessage) { - EntryMessage entryMsg = (EntryMessage) payload; - entryMsg = entryMsg.copy(); - if(entryMsg.isLockUpdate()) { - processLockUpdate(entryMsg, replyTo, id); - } - else if (entryMsg.isMapUpdate()) { - processMapUpdate(entryMsg); - } else { - processEntryMessage(entryMsg, replyTo, id); - } - } else if (payload instanceof ElectionMessage) { - ElectionMessage electionMsg = (ElectionMessage) payload; - electionMsg = electionMsg.copy(); - processElectionMessage(electionMsg, replyTo, id); - } - - }else if (messageType.equals(MESSAGE_TYPE)) { - processGroupMessage(memberId,id, replyTo, payload); - }else { - LOG.error("Unknown message type: " + messageType); - } - processRequest(id, payload); - }else { - LOG.error("Can't process a message type of null"); - } - } catch (JMSException e) { - LOG.warn("Failed to process message: " + message, e); - } - } - } - - void processRequest(String id, Object value) { - if (id != null) { - MapRequest result = null; - synchronized (this.stateRequests) { - result = this.stateRequests.remove(id); - } - if (result != null) { - result.put(id, value); - } - } - } - - void processLockUpdate(EntryMessage entryMsg, Destination replyTo, - String correlationId) { - synchronized (this.mapMutex) { - boolean newLock = entryMsg.getKey().isLocked(); - Member newOwner = entryMsg.getKey().getOwner(); - long newLockExpiration = newLock ? entryMsg.getKey().getLockExpiration():0l; - System.err.println(getName()+" PROC LOC = " + newOwner.getName()+ " LOCK = "+newLock); - - if (isCoordinator() && !entryMsg.isMapUpdate()) { - - EntryKey originalKey = getKey(entryMsg.getKey().getKey()); - if (originalKey != null) { - if (originalKey.isLocked()) { - if (!originalKey.getOwner().equals( - entryMsg.getKey().getOwner())) { - Serializable reply = new GroupMapUpdateException( - "Owned by " + originalKey.getOwner()); - sendReply(reply, replyTo, correlationId); - System.err.println(getName()+" PROC LOC SEND EXCEPTION TO " + newOwner.getName()); - } else { - originalKey.setLocked(newLock); - originalKey.setOwner(newOwner); - originalKey.setLockExpiration(newLockExpiration); - broadcastMapUpdate(entryMsg, correlationId); - - } - } else { - originalKey.setLocked(newLock); - originalKey.setOwner(newOwner); - originalKey.setLockExpiration(newLockExpiration); - broadcastMapUpdate(entryMsg, correlationId); - } - - } - } else { - EntryKey originalKey = getKey(entryMsg.getKey().getKey()); - if (originalKey != null) { - originalKey.setLocked(newLock); - originalKey.setOwner(newOwner); - originalKey.setLockExpiration(newLockExpiration); - } - } - } - } - - void processEntryMessage(EntryMessage entryMsg, Destination replyTo, - String correlationId) { - if (isCoordinator()) { - EntryKey key = entryMsg.getKey(); - EntryValue value = new EntryValue(key, - (V) entryMsg.getValue()); - boolean insert = entryMsg.isInsert(); - boolean containsKey = false; - synchronized (this.mapMutex) { - containsKey = this.localMap.containsKey(key.getKey()); - } - if (containsKey) { - EntryKey originalKey = getKey(key.getKey()); - if (originalKey.equals(key.getOwner()) || !originalKey.isLocked()) { - EntryValue old = null; - if (insert) { - synchronized (this.mapMutex) { - old = this.localMap.put(key.getKey(), value); - } - } else { - synchronized (this.mapMutex) { - old = this.localMap.remove(key.getKey()); - } - } - entryMsg.setOldValue(old.getValue()); - broadcastMapUpdate(entryMsg, correlationId); - fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), value - .getValue(), false); - } else { - Serializable reply = new GroupMapUpdateException( - "Owned by " + originalKey.getOwner()); - sendReply(reply, replyTo, correlationId); - } - } else { - if (insert) { - synchronized (this.mapMutex) { - this.localMap.put(key.getKey(), value); - } - broadcastMapUpdate(entryMsg, correlationId); - fireMapChanged(key.getOwner(), key.getKey(), null, value - .getValue(), false); - } else { - sendReply(null, replyTo, correlationId); - } - } - } - } - - void processMapUpdate(EntryMessage entryMsg) { - boolean containsKey = false; - EntryKey key = entryMsg.getKey(); - EntryValue value = new EntryValue(key, (V) entryMsg - .getValue()); - boolean insert = entryMsg.isInsert()||entryMsg.isSync(); - synchronized (this.mapMutex) { - containsKey = this.localMap.containsKey(key.getKey()); - } - if (!isCoordinator() || entryMsg.isSync()) { - if (containsKey) { - if (key.isLockExpired()) { - EntryValue old = this.localMap.get(key.getKey()); - if (old != null) { - old.getKey().setLocked(false); - } - } else { - EntryValue old = null; - if (insert) { - synchronized (this.mapMutex) { - old = this.localMap.put(key.getKey(), value); - } - } else { - synchronized (this.mapMutex) { - old = this.localMap.remove(key.getKey()); - value.setValue(null); - } - } - - fireMapChanged(key.getOwner(), key.getKey(), - old.getValue(), value.getValue(), entryMsg - .isExpired()); - } - } else { - if (insert) { - synchronized (this.mapMutex) { - this.localMap.put(key.getKey(), value); - } - fireMapChanged(key.getOwner(), key.getKey(), null, value - .getValue(), false); - } - } - } - } - - void processGroupMessage(String memberId,String replyId,Destination replyTo, Object payload) { - Member member = this.members.get(memberId); - if (member != null) { - fireMemberMessage(member, replyId, payload); - } - if (replyId != null) { - MapRequest result = null; - synchronized (this.messageRequests) { - result = this.messageRequests.remove(replyId); - } - if (result != null) { - result.put(replyId, payload); - } - } - } - - void handleHeartbeats(Message message) { - try { - if (message instanceof ObjectMessage) { - ObjectMessage objMsg = (ObjectMessage) message; - Member member = (Member) objMsg.getObject(); - handleHeartbeats(member); - } else { - LOG.warn("Unexpected message: " + message); - } - } catch (JMSException e) { - LOG.warn("Failed to handle heart beat", e); - } - } - - void handleHeartbeats(Member member) { - member.setTimeStamp(System.currentTimeMillis()); - if (this.members.put(member.getId(), member) == null) { - - fireMemberStarted(member); - if (!member.equals(this.local)) { - sendHeartBeat(member.getInBoxDestination()); - } - election(member, true); - synchronized (this.memberMutex) { - this.memberMutex.notifyAll(); - } - } - } - - void handleConsumerEvents(ConsumerEvent event) { - if (!event.isStarted()) { - Member member = this.members.remove(event.getConsumerId() - .toString()); - if (member != null) { - fireMemberStopped(member); - election(member, false); - } - } - } - - void checkMembership() { - if (this.started.get() && this.electionFinished.get()) { - long checkTime = System.currentTimeMillis() - - getHeartBeatInterval(); - boolean doElection = false; - for (Member member : this.members.values()) { - if (member.getTimeStamp() < checkTime) { - LOG.info("Member timestamp expired " + member); - this.members.remove(member.getId()); - fireMemberStopped(member); - doElection = true; - } - } - if (doElection) { - election(null, false); - } - } - } - - void expirationSweep() { - if (isCoordinator() && this.started.get() && this.electionFinished.get()) { - List expiredMessages = null; - List expiredLocks = null; - synchronized (this.mapMutex) { - Map> map = this.localMap; - if (map != null) { - long currentTime = System.currentTimeMillis(); - for (EntryValue value : map.values()) { - EntryKey k = value.getKey(); - if (k.isExpired(currentTime)) { - if (expiredMessages == null) { - expiredMessages = new ArrayList(); - expiredMessages.add(k); - } - }else if (k.isLockExpired(currentTime)) { - k.setLocked(false); - if (expiredLocks == null) { - expiredLocks = new ArrayList(); - expiredLocks.add(k); - } - expiredLocks.add(k); - } - } - } - } - //do the actual removal of entries in a separate thread - if (expiredMessages != null) { - final List expire = expiredMessages; - this.stateExecutor.execute(new Runnable() { - public void run() { - doMessageExpiration(expire); - } - }); - } - if (expiredLocks != null) { - final List expire = expiredLocks; - this.stateExecutor.execute(new Runnable() { - public void run() { - doLockExpiration(expire); - } - }); - } - } - - } - - void doMessageExpiration(List list) { - if (this.started.get() && this.electionFinished.get() - && isCoordinator()) { - for (EntryKey k : list) { - EntryValue old = null; - synchronized (this.mapMutex) { - old = this.localMap.remove(k.getKey()); - } - if (old != null) { - EntryMessage entryMsg = new EntryMessage(); - entryMsg.setType(EntryMessage.MessageType.DELETE); - entryMsg.setExpired(true); - entryMsg.setKey(k); - entryMsg.setValue(old.getValue()); - broadcastMapUpdate(entryMsg, ""); - fireMapChanged(k.getOwner(), k.getKey(), old.getValue(), - null, true); - } - } - } - } - - void doLockExpiration(List list) { - if (this.started.get() && this.electionFinished.get() - && isCoordinator()) { - for (EntryKey k : list) { - - EntryMessage entryMsg = new EntryMessage(); - entryMsg.setType(EntryMessage.MessageType.DELETE); - entryMsg.setLockExpired(true); - entryMsg.setKey(k); - broadcastMapUpdate(entryMsg, ""); - } - - } - } - - void sendHeartBeat() { - sendHeartBeat(this.heartBeatTopic); - } - - void sendHeartBeat(Destination destination) { - if (this.started.get()) { - try { - ObjectMessage msg = this.stateSession - .createObjectMessage(this.local); - msg.setJMSType(STATE_TYPE); - this.stateProducer.send(destination, msg); - } catch (javax.jms.IllegalStateException e) { - // ignore - as we are probably stopping - } catch (Throwable e) { - if (this.started.get()) { - LOG.warn("Failed to send heart beat", e); - } - } - } - } - - void updateNewMemberMap(Member member) { - List>> list = new ArrayList>>(); - synchronized (this.mapMutex) { - if (this.localMap != null) { - for (Map.Entry> entry : this.localMap - .entrySet()) { - list.add(entry); - } - } - } - try { - for (Map.Entry> entry : list) { - EntryMessage entryMsg = new EntryMessage(); - entryMsg.setKey(entry.getValue().getKey()); - entryMsg.setValue(entry.getValue().getValue()); - entryMsg.setType(EntryMessage.MessageType.SYNC); - entryMsg.setMapUpdate(true); - ObjectMessage objMsg = this.stateSession - .createObjectMessage(entryMsg); - if (!member.equals(entry.getValue().getKey().getOwner())) { - objMsg.setJMSType(STATE_TYPE); - this.stateProducer.send(member.getInBoxDestination(), objMsg); - } - } - } catch(javax.jms.IllegalStateException e) { - //ignore - as closing - }catch (JMSException e) { - if (started.get()) { - LOG.warn("Failed to update new member ", e); - } - } - } - - void fireMemberStarted(Member member) { - LOG.info(this.local.getName() + " Member started " + member); - for (MemberChangedListener l : this.membershipListeners) { - l.memberStarted(member); - } - } - - void fireMemberStopped(Member member) { - LOG.info(this.local.getName() + " Member stopped " + member); - for (MemberChangedListener l : this.membershipListeners) { - l.memberStopped(member); - } - // remove all entries owned by the stopped member - List> tmpList = new ArrayList>(); - boolean mapExists = false; - synchronized (this.mapMutex) { - mapExists = this.localMap != null; - if (mapExists) { - for (EntryValue value : this.localMap.values()) { - EntryKey entryKey = value.getKey(); - if (entryKey.getOwner().equals(member)) { - if (entryKey.isRemoveOnExit()) { - tmpList.add(entryKey); - } - if (entryKey.isReleaseLockOnExit()) { - entryKey.setLocked(false); - } - } - } - } - } - if (mapExists) { - for (EntryKey entryKey : tmpList) { - EntryValue value = null; - synchronized (this.mapMutex) { - value = this.localMap.remove(entryKey); - } - fireMapChanged(member, entryKey.getKey(), value.getValue(), - null,false); - } - } - } - - void fireMemberMessage(final Member member,final String replyId,final Object message) { - if (this.started.get() && this.stateExecutor != null - && !this.messageExecutor.isShutdown()) { - this.messageExecutor.execute(new Runnable() { - public void run() { - doFireMemberMessage(member,replyId,message); - } - }); - } - } - - void doFireMemberMessage(Member sender,String replyId,Object message) { - if(this.started.get()) { - for(GroupMessageListener l : this.groupMessageListeners) { - l.messageDelivered(sender,replyId, message); - } - } - } - - void fireMapChanged(final Member owner, final Object key, - final Object oldValue, final Object newValue, final boolean expired) { - if (this.started.get() && this.stateExecutor != null - && !this.stateExecutor.isShutdown()) { - this.stateExecutor.execute(new Runnable() { - public void run() { - doFireMapChanged(owner, key, oldValue, newValue, expired); - } - }); - } - } - - void doFireMapChanged(Member owner, Object key, Object oldValue, - Object newValue, boolean expired) { - if (this.started.get()) { - for (MapChangedListener l : this.mapChangedListeners) { - if (oldValue == null) { - l.mapInsert(owner, key, newValue); - } else if (newValue == null) { - l.mapRemove(owner, key, oldValue, expired); - } else { - l.mapUpdate(owner, key, oldValue, newValue); - } - } - } - } - - void election(final Member member, final boolean memberStarted) { - if (this.started.get() && this.stateExecutor != null - && !this.electionExecutor.isShutdown()) { - synchronized (this.electionExecutor) { - //remove any queued election tasks - List list = new ArrayList( - this.electionExecutor.getQueue()); - for (Runnable r : list) { - this.electionExecutor.remove(r); - } - } - this.electionExecutor.execute(new Runnable() { - public void run() { - doElection(member, memberStarted); - } - }); - } - } - - void doElection(Member member, boolean memberStarted) { - if ((member == null || !member.equals(this.local)) - && (this.electionFinished.compareAndSet(true, false)||true)) { - boolean wasCoordinator = isCoordinator() && !isEmpty(); - // call an election - while (!callElection()&&this.started.get()) - ; - if (this.started.get()) { - List members = new ArrayList(this.members.values()); - this.coordinator = selectCordinator(members); - if (isCoordinator()) { - broadcastElectionType(ElectionMessage.MessageType.COORDINATOR); - } - if (memberStarted && member != null) { - if (wasCoordinator || isCoordinator() && this.started.get()) { - updateNewMemberMap(member); - } - } - if (!this.electionFinished.get()) { - try { - synchronized (this.electionFinished) { - this.electionFinished.wait(this.heartBeatInterval * 2); - } - } catch (InterruptedException e) { - } - } - if (!this.electionFinished.get()) { - // we must be the coordinator - this.coordinator = this.local; - this.electionFinished.set(true); - broadcastElectionType(ElectionMessage.MessageType.COORDINATOR); - } - } - } - } - - boolean callElection() { - List members = new ArrayList(this.members.values()); - AsyncMapRequest request = new AsyncMapRequest(); - for (Member member : members) { - if (this.local.getId().compareTo(member.getId()) < 0) { - ElectionMessage msg = new ElectionMessage(); - msg.setMember(this.local); - msg.setType(ElectionMessage.MessageType.ELECTION); - sendAsyncStateRequest(request, member, msg); - } - } - return request.isSuccess(getHeartBeatInterval()); - } - - void processElectionMessage(ElectionMessage msg, Destination replyTo, - String correlationId) { - if (msg.isElection()) { - msg.setType(ElectionMessage.MessageType.ANSWER); - msg.setMember(this.local); - sendReply(msg, replyTo, correlationId); - } else if (msg.isCoordinator()) { - synchronized (this.electionFinished) { - this.coordinator = msg.getMember(); - this.electionFinished.set(true); - this.electionFinished.notifyAll(); - } - } - } - - void broadcastElectionType(ElectionMessage.MessageType type) { - if (started.get()) { - try { - ElectionMessage msg = new ElectionMessage(); - msg.setMember(this.local); - msg.setType(type); - ObjectMessage objMsg = this.stateSession.createObjectMessage(msg); - objMsg.setJMSType(STATE_TYPE); - this.stateProducer.send(this.stateTopic, objMsg); - } catch (javax.jms.IllegalStateException e) { - // ignore - we are stopping - } catch (JMSException e) { - if (this.started.get()) { - LOG.error("Failed to broadcast election message: " + type, - e); - } - } - } - } - - void checkStatus() throws IllegalStateException { - if (!started.get()) { - throw new IllegalStateException("GroupMap " + this.local.getName() - + " not started"); - } - waitForElection(); - } - - void waitForElection() { - synchronized (this.electionFinished) { - while (started.get() && !this.electionFinished.get()) { - try { - this.electionFinished.wait(1000); - } catch (InterruptedException e) { - stop(); - Thread.currentThread().interrupt(); - } - } - } - } - - public String toString() { - return "Group:" + getName() + "{id=" + this.local.getId() - + ",coordinator=" + isCoordinator() + ",inbox=" - + this.local.getInBoxDestination() + "}"; - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java b/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java deleted file mode 100644 index 4633ed3cbd..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -/** - * thrown when updating a key to map that the local client doesn't own - * - */ -public class GroupMapUpdateException extends RuntimeException { - private static final long serialVersionUID = -7584658017201604560L; - - /** - * @param message - */ - public GroupMapUpdateException(String message) { - super(message); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java b/activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java deleted file mode 100644 index b3fb64bec1..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.group; - - -/** - * A listener for message communication - * - */ -public interface GroupMessageListener { - - /** - * Called when a message is delivered to the Group from another member - * @param sender the member who sent the message - * @param replyId the id to use to respond to a message - * @param message the message object - */ - void messageDelivered(Member sender, String replyId, Object message); -} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java b/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java deleted file mode 100644 index 1465eceb3d..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -/** - *Get notifications about changes to the state of the map - * - */ -public interface MapChangedListener { - - /** - * Called when a key/value pair is inserted into the map - * @param owner - * @param key - * @param value - */ - void mapInsert(Member owner,Object key, Object value); - - /** - * Called when a key value is updated in the map - * @param owner - * @param Key - * @param oldValue - * @param newValue - */ - void mapUpdate(Member owner,Object Key,Object oldValue,Object newValue); - - /** - * Called when a key value is removed from the map - * @param owner - * @param key - * @param value - * @param expired - */ - void mapRemove(Member owner,Object key, Object value,boolean expired); -} diff --git a/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java b/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java deleted file mode 100644 index 117b076230..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Return information about map update - * - */ -public class MapRequest { - private final AtomicBoolean done = new AtomicBoolean(); - private Object response; - private RequestCallback callback; - - Object get(long timeout) { - synchronized (this.done) { - if (this.done.get() == false && this.response == null) { - try { - this.done.wait(timeout); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - return this.response; - } - - void put(String id,Object response) { - this.response = response; - cancel(); - RequestCallback callback = this.callback; - if (callback != null) { - callback.finished(id); - } - } - - void cancel() { - this.done.set(true); - synchronized (this.done) { - this.done.notifyAll(); - } - } - - void setCallback(RequestCallback callback) { - this.callback=callback; - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/group/Member.java b/activemq-core/src/main/java/org/apache/activemq/group/Member.java deleted file mode 100644 index b0b2b68031..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/Member.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import javax.jms.Destination; -import org.apache.activemq.util.IdGenerator; -import com.sun.jndi.url.corbaname.corbanameURLContextFactory; - -/** - *

- * A Member holds information about a member of the group - * - */ -public class Member implements Externalizable { - private String name; - private String id; - private String hostname; - private long startTime; - private int coordinatorWeight; - private Destination inBoxDestination; - private transient long timeStamp; - - - /** - * Default constructor - only used by serialization - */ - public Member() { - } - /** - * @param name - */ - public Member(String name) { - this.name = name; - this.hostname = IdGenerator.getHostName(); - this.startTime=System.currentTimeMillis(); - } - - /** - * @return the name - */ - public String getName() { - return this.name; - } - - /** - * @return the id - */ - public String getId() { - return this.id; - } - - void setId(String id) { - this.id=id; - } - - /** - * @return the hostname - */ - public String getHostname() { - return this.hostname; - } - - /** - * @return the startTime - */ - public long getStartTime() { - return this.startTime; - } - - /** - * @return the inbox destination - */ - public Destination getInBoxDestination() { - return this.inBoxDestination; - } - - void setInBoxDestination(Destination dest) { - this.inBoxDestination=dest; - } - - /** - * @return the timeStamp - */ - long getTimeStamp() { - return this.timeStamp; - } - - /** - * @param timeStamp the timeStamp to set - */ - void setTimeStamp(long timeStamp) { - this.timeStamp = timeStamp; - } - /** - * @return the coordinatorWeight - */ - public int getCoordinatorWeight() { - return this.coordinatorWeight; - } - /** - * @param coordinatorWeight the coordinatorWeight to set - */ - public void setCoordinatorWeight(int coordinatorWeight) { - this.coordinatorWeight = coordinatorWeight; - } - - - - public String toString() { - return this.name+"["+this.id+"]@"+this.hostname; - } - - - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - this.coordinatorWeight=in.readInt();; - this.name = in.readUTF(); - this.id = in.readUTF(); - this.hostname = in.readUTF(); - this.startTime=in.readLong(); - this.inBoxDestination=(Destination) in.readObject(); - } - - public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(this.coordinatorWeight); - out.writeUTF(this.name != null ? this.name : ""); - out.writeUTF(this.id != null ? this.id : ""); - out.writeUTF(this.hostname != null ? this.hostname : ""); - out.writeLong(this.startTime); - out.writeObject(this.inBoxDestination); - } - - public int hashCode() { - return this.id.hashCode(); - } - - public boolean equals(Object obj) { - boolean result = false; - if (obj instanceof Member) { - Member other = (Member)obj; - result = this.id.equals(other.id); - } - return result; - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java b/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java deleted file mode 100644 index a5c9ee6385..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.group; - -/** - * A listener for membership changes to a group - * - */ -public interface MemberChangedListener { - - /** - * Notification a member has started - * @param member - */ - void memberStarted(Member member); - - /** - * Notification a member has stopped - * @param member - */ - void memberStopped(Member member); -} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java b/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java deleted file mode 100644 index 90ee9b509b..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - - -/** - * Return information about map update - * - */ -public interface RequestCallback{ - /** - * Optionally called when a request is finished - * @param id - */ - void finished(String id); -} diff --git a/activemq-core/src/main/java/org/apache/activemq/group/package.html b/activemq-core/src/main/java/org/apache/activemq/group/package.html deleted file mode 100755 index d4af9c3fd1..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/group/package.html +++ /dev/null @@ -1,25 +0,0 @@ - - - - - - -Shared state, messaging and membership information between members of a distributed group - - -