mirror of https://github.com/apache/activemq.git
Ensure ordered access to the group and add expiration to state in
the GroupMap git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@683259 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3a77300e06
commit
b11c28615e
|
@ -0,0 +1,61 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.group;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Return information about map update
|
||||
*
|
||||
*/
|
||||
public class AsyncMapRequest implements RequestCallback{
|
||||
private final Object mutex = new Object();
|
||||
|
||||
private Set<String> requests = new HashSet<String>();
|
||||
|
||||
public void add(String id, MapRequest request) {
|
||||
request.setCallback(this);
|
||||
this.requests.add(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for requests
|
||||
* @param timeout
|
||||
* @return
|
||||
*/
|
||||
public boolean isSuccess(long timeout) {
|
||||
long deadline = System.currentTimeMillis() + timeout;
|
||||
while (!this.requests.isEmpty()) {
|
||||
synchronized (this.mutex) {
|
||||
try {
|
||||
this.mutex.wait(timeout);
|
||||
} catch (InterruptedException e) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
|
||||
}
|
||||
return this.requests.isEmpty();
|
||||
}
|
||||
|
||||
|
||||
public void finished(String id) {
|
||||
this.requests.remove(id);
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.group;
|
||||
|
||||
/**
|
||||
* Default implementation of a MapChangedListener
|
||||
*
|
||||
*/
|
||||
public class DefaultMapChangedListener implements MapChangedListener{
|
||||
|
||||
public void mapInsert(Member owner, Object key, Object value) {
|
||||
}
|
||||
|
||||
public void mapRemove(Member owner, Object key, Object value,boolean expired) {
|
||||
}
|
||||
|
||||
public void mapUpdate(Member owner, Object Key, Object oldValue,Object newValue) {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,105 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.group;
|
||||
|
||||
import java.io.Externalizable;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInput;
|
||||
import java.io.ObjectOutput;
|
||||
|
||||
/**
|
||||
* Used to pass information around
|
||||
*
|
||||
*/
|
||||
public class ElectionMessage implements Externalizable{
|
||||
static enum MessageType{ELECTION,ANSWER,COORDINATOR};
|
||||
private Member member;
|
||||
private MessageType type;
|
||||
|
||||
/**
|
||||
* @return the member
|
||||
*/
|
||||
public Member getMember() {
|
||||
return this.member;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param member the member to set
|
||||
*/
|
||||
public void setMember(Member member) {
|
||||
this.member = member;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the type
|
||||
*/
|
||||
public MessageType getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param type the type to set
|
||||
*/
|
||||
public void setType(MessageType type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if election message
|
||||
*/
|
||||
public boolean isElection() {
|
||||
return this.type != null && this.type.equals(MessageType.ELECTION);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if answer message
|
||||
*/
|
||||
public boolean isAnswer() {
|
||||
return this.type != null && this.type.equals(MessageType.ANSWER);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if coordinator message
|
||||
*/
|
||||
public boolean isCoordinator() {
|
||||
return this.type != null && this.type.equals(MessageType.COORDINATOR);
|
||||
}
|
||||
|
||||
|
||||
public ElectionMessage copy() {
|
||||
ElectionMessage result = new ElectionMessage();
|
||||
result.member=this.member;
|
||||
result.type=this.type;
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
public void readExternal(ObjectInput in) throws IOException,
|
||||
ClassNotFoundException {
|
||||
this.member=(Member) in.readObject();
|
||||
this.type=(MessageType) in.readObject();
|
||||
}
|
||||
|
||||
public void writeExternal(ObjectOutput out) throws IOException {
|
||||
out.writeObject(this.member);
|
||||
out.writeObject(this.type);
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "ElectionMessage: "+ this.member + "{"+this.type+ "}";
|
||||
}
|
||||
}
|
|
@ -30,6 +30,7 @@ class EntryKey<K> implements Externalizable {
|
|||
private K key;
|
||||
private boolean share;
|
||||
private boolean removeOnExit;
|
||||
private long expiration;
|
||||
|
||||
/**
|
||||
* Default constructor - for serialization
|
||||
|
@ -88,6 +89,38 @@ class EntryKey<K> implements Externalizable {
|
|||
public void setRemoveOnExit(boolean removeOnExit) {
|
||||
this.removeOnExit = removeOnExit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the expiration
|
||||
*/
|
||||
public long getExpiration() {
|
||||
return expiration;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param expiration the expiration to set
|
||||
*/
|
||||
public void setExpiration(long expiration) {
|
||||
this.expiration = expiration;
|
||||
}
|
||||
|
||||
void setTimeToLive(long ttl) {
|
||||
if (ttl > 0 ) {
|
||||
this.expiration=ttl+System.currentTimeMillis();
|
||||
}else {
|
||||
this.expiration =0l;
|
||||
}
|
||||
}
|
||||
|
||||
boolean isExpired() {
|
||||
return isExpired(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
boolean isExpired(long currentTime) {
|
||||
return this.expiration > 0 && this.expiration < currentTime;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public boolean equals(Object obj) {
|
||||
boolean result = false;
|
||||
|
@ -103,6 +136,7 @@ class EntryKey<K> implements Externalizable {
|
|||
out.writeObject(this.key);
|
||||
out.writeBoolean(isShare());
|
||||
out.writeBoolean(isRemoveOnExit());
|
||||
out.writeLong(getExpiration());
|
||||
}
|
||||
|
||||
public void readExternal(ObjectInput in) throws IOException,
|
||||
|
@ -111,5 +145,10 @@ class EntryKey<K> implements Externalizable {
|
|||
this.key = (K) in.readObject();
|
||||
this.share = in.readBoolean();
|
||||
this.removeOnExit=in.readBoolean();
|
||||
this.expiration=in.readLong();
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "key:"+this.key;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,19 +26,21 @@ import java.io.ObjectOutput;
|
|||
*
|
||||
*/
|
||||
public class EntryMessage implements Externalizable{
|
||||
static enum MessageType{INSERT,DELETE};
|
||||
static enum MessageType{INSERT,DELETE,SYNC};
|
||||
private EntryKey key;
|
||||
private Object value;
|
||||
private MessageType type;
|
||||
private boolean mapUpdate;
|
||||
private boolean expired;
|
||||
|
||||
/**
|
||||
* @return the owner
|
||||
*/
|
||||
public EntryKey getKey() {
|
||||
return key;
|
||||
return this.key;
|
||||
}
|
||||
/**
|
||||
* @param owner the owner to set
|
||||
* @param key
|
||||
*/
|
||||
public void setKey(EntryKey key) {
|
||||
this.key = key;
|
||||
|
@ -47,7 +49,7 @@ public class EntryMessage implements Externalizable{
|
|||
* @return the value
|
||||
*/
|
||||
public Object getValue() {
|
||||
return value;
|
||||
return this.value;
|
||||
}
|
||||
/**
|
||||
* @param value the value to set
|
||||
|
@ -60,7 +62,7 @@ public class EntryMessage implements Externalizable{
|
|||
* @return the type
|
||||
*/
|
||||
public MessageType getType() {
|
||||
return type;
|
||||
return this.type;
|
||||
}
|
||||
/**
|
||||
* @param type the type to set
|
||||
|
@ -69,18 +71,81 @@ public class EntryMessage implements Externalizable{
|
|||
this.type = type;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the mapUpdate
|
||||
*/
|
||||
public boolean isMapUpdate() {
|
||||
return this.mapUpdate;
|
||||
}
|
||||
/**
|
||||
* @param mapUpdate the mapUpdate to set
|
||||
*/
|
||||
public void setMapUpdate(boolean mapUpdate) {
|
||||
this.mapUpdate = mapUpdate;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the expired
|
||||
*/
|
||||
public boolean isExpired() {
|
||||
return expired;
|
||||
}
|
||||
/**
|
||||
* @param expired the expired to set
|
||||
*/
|
||||
public void setExpired(boolean expired) {
|
||||
this.expired = expired;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return if insert message
|
||||
*/
|
||||
public boolean isInsert() {
|
||||
return this.type != null && this.type.equals(MessageType.INSERT);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if delete message
|
||||
*/
|
||||
public boolean isDelete() {
|
||||
return this.type != null && this.type.equals(MessageType.DELETE);
|
||||
}
|
||||
|
||||
public boolean isSync() {
|
||||
return this.type != null && this.type.equals(MessageType.SYNC);
|
||||
}
|
||||
|
||||
public EntryMessage copy() {
|
||||
EntryMessage result = new EntryMessage();
|
||||
result.key=this.key;
|
||||
result.value=this.value;
|
||||
result.type=this.type;
|
||||
result.mapUpdate=this.mapUpdate;
|
||||
result.expired=this.expired;
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void readExternal(ObjectInput in) throws IOException,
|
||||
ClassNotFoundException {
|
||||
this.key=(EntryKey) in.readObject();
|
||||
this.value=in.readObject();
|
||||
this.type=(MessageType) in.readObject();
|
||||
this.type=(MessageType) in.readObject();
|
||||
this.mapUpdate=in.readBoolean();
|
||||
this.expired=in.readBoolean();
|
||||
}
|
||||
|
||||
public void writeExternal(ObjectOutput out) throws IOException {
|
||||
out.writeObject(this.key);
|
||||
out.writeObject(this.value);
|
||||
out.writeObject(this.type);
|
||||
out.writeBoolean(this.mapUpdate);
|
||||
out.writeBoolean(this.expired);
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "EntryMessage: "+this.type + "[" + this.key + "," + this.value +
|
||||
"]{update=" + this.mapUpdate + "}";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,14 @@ class EntryValue<V> {
|
|||
return this.value;
|
||||
}
|
||||
|
||||
/**
|
||||
* set the value
|
||||
* @param value
|
||||
*/
|
||||
public void setValue(V value) {
|
||||
this.value=value;
|
||||
}
|
||||
|
||||
public int hashCode() {
|
||||
return this.value != null ? this.value.hashCode() : super.hashCode();
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -20,13 +20,13 @@ package org.apache.activemq.group;
|
|||
* thrown when updating a key to map that the local client doesn't own
|
||||
*
|
||||
*/
|
||||
public class IllegalAccessException extends java.lang.IllegalStateException {
|
||||
public class GroupMapUpdateException extends RuntimeException {
|
||||
private static final long serialVersionUID = -7584658017201604560L;
|
||||
|
||||
/**
|
||||
* @param message
|
||||
*/
|
||||
public IllegalAccessException(String message) {
|
||||
public GroupMapUpdateException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
|
@ -23,11 +23,28 @@ package org.apache.activemq.group;
|
|||
public interface MapChangedListener {
|
||||
|
||||
/**
|
||||
* Called when the map has changed
|
||||
* Called when a key/value pair is inserted into the map
|
||||
* @param owner
|
||||
* @param key
|
||||
* @param value
|
||||
*/
|
||||
void mapInsert(Member owner,Object key, Object value);
|
||||
|
||||
/**
|
||||
* Called when a key value is updated in the map
|
||||
* @param owner
|
||||
* @param Key
|
||||
* @param oldValue
|
||||
* @param newValue
|
||||
*/
|
||||
void mapChanged(Member owner,Object key, Object oldValue, Object newValue);
|
||||
void mapUpdate(Member owner,Object Key,Object oldValue,Object newValue);
|
||||
|
||||
/**
|
||||
* Called when a key value is removed from the map
|
||||
* @param owner
|
||||
* @param key
|
||||
* @param value
|
||||
* @param expired
|
||||
*/
|
||||
void mapRemove(Member owner,Object key, Object value,boolean expired);
|
||||
}
|
||||
|
|
|
@ -25,12 +25,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
public class MapRequest {
|
||||
private final AtomicBoolean done = new AtomicBoolean();
|
||||
private Object response;
|
||||
private RequestCallback callback;
|
||||
|
||||
Object get(int timeout) {
|
||||
synchronized (done) {
|
||||
if (done.get() == false && response == null) {
|
||||
Object get(long timeout) {
|
||||
synchronized (this.done) {
|
||||
if (this.done.get() == false && this.response == null) {
|
||||
try {
|
||||
done.wait(timeout);
|
||||
this.done.wait(timeout);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
@ -39,15 +40,23 @@ public class MapRequest {
|
|||
return this.response;
|
||||
}
|
||||
|
||||
void put(Object response) {
|
||||
void put(String id,Object response) {
|
||||
this.response = response;
|
||||
cancel();
|
||||
RequestCallback callback = this.callback;
|
||||
if (callback != null) {
|
||||
callback.finished(id);
|
||||
}
|
||||
}
|
||||
|
||||
void cancel() {
|
||||
done.set(true);
|
||||
synchronized (done) {
|
||||
done.notifyAll();
|
||||
this.done.set(true);
|
||||
synchronized (this.done) {
|
||||
this.done.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
void setCallback(RequestCallback callback) {
|
||||
this.callback=callback;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.group;
|
||||
|
||||
|
||||
/**
|
||||
* Return information about map update
|
||||
*
|
||||
*/
|
||||
public interface RequestCallback{
|
||||
/**
|
||||
* Optionally called when a request is finished
|
||||
* @param id
|
||||
*/
|
||||
void finished(String id);
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.group;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
||||
|
||||
public class GroupMapMemberTest extends TestCase {
|
||||
protected BrokerService broker;
|
||||
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
|
||||
|
||||
/**
|
||||
* Test method for
|
||||
* {@link org.apache.activemq.group.GroupMap#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testGroup() throws Exception {
|
||||
|
||||
int number = 20;
|
||||
List<Connection>connections = new ArrayList<Connection>();
|
||||
List<GroupMap>groupMaps = new ArrayList<GroupMap>();
|
||||
ConnectionFactory factory = createConnectionFactory();
|
||||
for (int i =0; i < number; i++) {
|
||||
Connection connection = factory.createConnection();
|
||||
connection.start();
|
||||
connections.add(connection);
|
||||
GroupMap map = new GroupMap(connection,"map"+i);
|
||||
map.setHeartBeatInterval(20000);
|
||||
if(i ==number-1) {
|
||||
map.setMinimumGroupSize(number);
|
||||
}
|
||||
map.start();
|
||||
groupMaps.add(map);
|
||||
}
|
||||
|
||||
int coordinator = 0;
|
||||
for (GroupMap map:groupMaps) {
|
||||
if (map.isCoordinator()) {
|
||||
coordinator++;
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(1,coordinator);
|
||||
groupMaps.get(0).put("key", "value");
|
||||
Thread.sleep(2000);
|
||||
for (GroupMap map:groupMaps) {
|
||||
assertTrue(map.get("key").equals("value"));
|
||||
}
|
||||
for(GroupMap map:groupMaps) {
|
||||
map.stop();
|
||||
}
|
||||
for (Connection connection:connections) {
|
||||
connection.stop();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
if (broker == null) {
|
||||
broker = createBroker();
|
||||
}
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected ActiveMQConnectionFactory createConnectionFactory()throws Exception {
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
|
||||
ActiveMQConnection.DEFAULT_BROKER_URL);
|
||||
return cf;
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = new BrokerService();
|
||||
configureBroker(answer);
|
||||
answer.start();
|
||||
return answer;
|
||||
}
|
||||
|
||||
protected void configureBroker(BrokerService answer) throws Exception {
|
||||
answer.setPersistent(false);
|
||||
answer.addConnector(bindAddress);
|
||||
answer.setDeleteAllMessagesOnStartup(true);
|
||||
}
|
||||
}
|
||||
|
|
@ -28,7 +28,8 @@ import org.apache.activemq.broker.BrokerService;
|
|||
|
||||
public class GroupMapTest extends TestCase {
|
||||
protected BrokerService broker;
|
||||
protected Connection connection;
|
||||
protected Connection connection1;
|
||||
protected Connection connection2;
|
||||
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
|
||||
|
||||
/**
|
||||
|
@ -38,7 +39,7 @@ public class GroupMapTest extends TestCase {
|
|||
*/
|
||||
public void testAddMemberChangedListener() throws Exception {
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
GroupMap map1 = new GroupMap(connection,"map1");
|
||||
GroupMap map1 = new GroupMap(connection1,"map1");
|
||||
map1.addMemberChangedListener(new MemberChangedListener(){
|
||||
|
||||
public void memberStarted(Member member) {
|
||||
|
@ -64,10 +65,7 @@ public class GroupMapTest extends TestCase {
|
|||
}
|
||||
}
|
||||
assertEquals(1, counter.get());
|
||||
ConnectionFactory factory = createConnectionFactory();
|
||||
Connection connection2 = factory.createConnection();
|
||||
connection2.start();
|
||||
GroupMap map2 = new GroupMap(connection,"map2");
|
||||
GroupMap map2 = new GroupMap(connection2,"map2");
|
||||
map2.start();
|
||||
synchronized(counter) {
|
||||
if (counter.get()<2) {
|
||||
|
@ -78,12 +76,11 @@ public class GroupMapTest extends TestCase {
|
|||
map2.stop();
|
||||
synchronized(counter) {
|
||||
if (counter.get()>=2) {
|
||||
counter.wait(5000);
|
||||
counter.wait(GroupMap.DEFAULT_HEART_BEAT_INTERVAL*3);
|
||||
}
|
||||
}
|
||||
assertEquals(1, counter.get());
|
||||
map1.stop();
|
||||
connection2.close();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -92,48 +89,94 @@ public class GroupMapTest extends TestCase {
|
|||
* @throws Exception
|
||||
*/
|
||||
public void testAddMapChangedListener() throws Exception {
|
||||
GroupMap map = new GroupMap(connection,"map");
|
||||
final AtomicBoolean called = new AtomicBoolean();
|
||||
map.addMapChangedListener(new MapChangedListener(){
|
||||
public void mapChanged(Member owner, Object key, Object oldValue,
|
||||
Object newValue) {
|
||||
synchronized(called) {
|
||||
called.set(true);
|
||||
called.notifyAll();
|
||||
final AtomicBoolean called1 = new AtomicBoolean();
|
||||
final AtomicBoolean called2 = new AtomicBoolean();
|
||||
|
||||
GroupMap map1 = new GroupMap(connection1,"map1");
|
||||
|
||||
map1.addMapChangedListener(new DefaultMapChangedListener() {
|
||||
public void mapInsert(Member owner,Object Key, Object Value) {
|
||||
synchronized(called1) {
|
||||
called1.set(true);
|
||||
called1.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
map.start();
|
||||
map.put("test", "blob");
|
||||
synchronized(called) {
|
||||
if (!called.get()) {
|
||||
called.wait(5000);
|
||||
map1.start();
|
||||
|
||||
GroupMap map2 = new GroupMap(connection2,"map2");
|
||||
|
||||
map2.addMapChangedListener(new DefaultMapChangedListener() {
|
||||
public void mapInsert(Member owner,Object Key, Object Value) {
|
||||
synchronized(called2) {
|
||||
called2.set(true);
|
||||
called2.notifyAll();
|
||||
}
|
||||
}
|
||||
});
|
||||
map2.start();
|
||||
|
||||
|
||||
map1.put("test", "blob");
|
||||
synchronized(called1) {
|
||||
if (!called1.get()) {
|
||||
called1.wait(5000);
|
||||
}
|
||||
}
|
||||
assertTrue(called.get());
|
||||
map.stop();
|
||||
synchronized(called2) {
|
||||
if (!called2.get()) {
|
||||
called2.wait(5000);
|
||||
}
|
||||
}
|
||||
assertTrue(called1.get());
|
||||
assertTrue(called2.get());
|
||||
map1.stop();
|
||||
map2.stop();
|
||||
}
|
||||
|
||||
public void testGetWriteLock() throws Exception {
|
||||
GroupMap map1 = new GroupMap(connection1, "map1");
|
||||
final AtomicBoolean called = new AtomicBoolean();
|
||||
map1.start();
|
||||
GroupMap map2 = new GroupMap(connection2, "map2");
|
||||
map2.setMinimumGroupSize(2);
|
||||
map2.start();
|
||||
map2.put("test", "foo");
|
||||
try {
|
||||
map1.put("test", "bah");
|
||||
fail("Should have thrown an exception!");
|
||||
} catch (GroupMapUpdateException e) {
|
||||
}
|
||||
map1.stop();
|
||||
map2.stop();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test method for {@link org.apache.activemq.group.GroupMap#clear()}.
|
||||
* @throws Exception
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testClear() throws Exception {
|
||||
GroupMap map1 = new GroupMap(connection,"map1");
|
||||
GroupMap map1 = new GroupMap(connection1,"map1");
|
||||
final AtomicBoolean called = new AtomicBoolean();
|
||||
map1.addMapChangedListener(new MapChangedListener(){
|
||||
public void mapChanged(Member owner, Object key, Object oldValue,
|
||||
Object newValue) {
|
||||
map1.addMapChangedListener(new DefaultMapChangedListener() {
|
||||
public void mapInsert(Member owner,Object Key, Object Value) {
|
||||
synchronized(called) {
|
||||
called.set(true);
|
||||
called.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
public void mapRemove(Member owner, Object key, Object value,boolean expired) {
|
||||
synchronized(called) {
|
||||
called.set(true);
|
||||
called.notifyAll();
|
||||
}
|
||||
}
|
||||
});
|
||||
map1.start();
|
||||
GroupMap map2 = new GroupMap(connection,"map2");
|
||||
GroupMap map2 = new GroupMap(connection2,"map2");
|
||||
map2.start();
|
||||
map2.put("test","foo");
|
||||
synchronized(called) {
|
||||
|
@ -159,20 +202,19 @@ public class GroupMapTest extends TestCase {
|
|||
* Test a new map is populated for existing values
|
||||
*/
|
||||
public void testMapUpdatedOnStart() throws Exception {
|
||||
GroupMap map1 = new GroupMap(connection,"map1");
|
||||
GroupMap map1 = new GroupMap(connection1,"map1");
|
||||
final AtomicBoolean called = new AtomicBoolean();
|
||||
|
||||
map1.start();
|
||||
map1.put("test", "foo");
|
||||
GroupMap map2 = new GroupMap(connection,"map2");
|
||||
map2.addMapChangedListener(new MapChangedListener(){
|
||||
public void mapChanged(Member owner, Object key, Object oldValue,
|
||||
Object newValue) {
|
||||
GroupMap map2 = new GroupMap(connection2,"map2");
|
||||
map2.addMapChangedListener(new DefaultMapChangedListener() {
|
||||
public void mapInsert(Member owner,Object Key, Object Value) {
|
||||
synchronized(called) {
|
||||
called.set(true);
|
||||
called.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
map2.start();
|
||||
|
||||
|
@ -190,20 +232,18 @@ public class GroupMapTest extends TestCase {
|
|||
}
|
||||
|
||||
public void testContainsKey() throws Exception {
|
||||
GroupMap map1 = new GroupMap(connection,"map1");
|
||||
GroupMap map1 = new GroupMap(connection1,"map1");
|
||||
final AtomicBoolean called = new AtomicBoolean();
|
||||
map1.addMapChangedListener(new MapChangedListener(){
|
||||
public void mapChanged(Member owner, Object key, Object oldValue,
|
||||
Object newValue) {
|
||||
map1.addMapChangedListener(new DefaultMapChangedListener() {
|
||||
public void mapInsert(Member owner,Object Key, Object Value) {
|
||||
synchronized(called) {
|
||||
called.set(true);
|
||||
called.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
map1.start();
|
||||
GroupMap map2 = new GroupMap(connection,"map2");
|
||||
GroupMap map2 = new GroupMap(connection2,"map2");
|
||||
map2.start();
|
||||
map2.put("test","foo");
|
||||
synchronized(called) {
|
||||
|
@ -225,20 +265,18 @@ public class GroupMapTest extends TestCase {
|
|||
* @throws Exception
|
||||
*/
|
||||
public void testContainsValue() throws Exception {
|
||||
GroupMap map1 = new GroupMap(connection,"map1");
|
||||
GroupMap map1 = new GroupMap(connection1,"map1");
|
||||
final AtomicBoolean called = new AtomicBoolean();
|
||||
map1.addMapChangedListener(new MapChangedListener(){
|
||||
public void mapChanged(Member owner, Object key, Object oldValue,
|
||||
Object newValue) {
|
||||
map1.addMapChangedListener(new DefaultMapChangedListener() {
|
||||
public void mapInsert(Member owner,Object Key, Object Value) {
|
||||
synchronized(called) {
|
||||
called.set(true);
|
||||
called.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
map1.start();
|
||||
GroupMap map2 = new GroupMap(connection,"map2");
|
||||
GroupMap map2 = new GroupMap(connection2,"map2");
|
||||
map2.start();
|
||||
map2.put("test","foo");
|
||||
synchronized(called) {
|
||||
|
@ -265,20 +303,18 @@ public class GroupMapTest extends TestCase {
|
|||
* @throws Exception
|
||||
*/
|
||||
public void testGet() throws Exception {
|
||||
GroupMap map1 = new GroupMap(connection,"map1");
|
||||
GroupMap map1 = new GroupMap(connection1,"map1");
|
||||
final AtomicBoolean called = new AtomicBoolean();
|
||||
map1.addMapChangedListener(new MapChangedListener(){
|
||||
public void mapChanged(Member owner, Object key, Object oldValue,
|
||||
Object newValue) {
|
||||
map1.addMapChangedListener(new DefaultMapChangedListener() {
|
||||
public void mapInsert(Member owner,Object Key, Object Value) {
|
||||
synchronized(called) {
|
||||
called.set(true);
|
||||
called.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
map1.start();
|
||||
GroupMap map2 = new GroupMap(connection,"map2");
|
||||
GroupMap map2 = new GroupMap(connection2,"map2");
|
||||
map2.start();
|
||||
map2.put("test","foo");
|
||||
synchronized(called) {
|
||||
|
@ -299,20 +335,25 @@ public class GroupMapTest extends TestCase {
|
|||
* {@link org.apache.activemq.group.GroupMap#remove(java.lang.Object)}.
|
||||
*/
|
||||
public void testRemove() throws Exception{
|
||||
GroupMap map1 = new GroupMap(connection,"map1");
|
||||
GroupMap map1 = new GroupMap(connection1,"map1");
|
||||
final AtomicBoolean called = new AtomicBoolean();
|
||||
map1.addMapChangedListener(new MapChangedListener(){
|
||||
public void mapChanged(Member owner, Object key, Object oldValue,
|
||||
Object newValue) {
|
||||
map1.addMapChangedListener(new DefaultMapChangedListener() {
|
||||
public void mapInsert(Member owner,Object Key, Object Value) {
|
||||
synchronized(called) {
|
||||
called.set(true);
|
||||
called.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
public void mapRemove(Member owner, Object key, Object value,boolean expired) {
|
||||
synchronized(called) {
|
||||
called.set(true);
|
||||
called.notifyAll();
|
||||
}
|
||||
}
|
||||
});
|
||||
map1.start();
|
||||
GroupMap map2 = new GroupMap(connection,"map2");
|
||||
GroupMap map2 = new GroupMap(connection2,"map2");
|
||||
map2.start();
|
||||
map2.put("test","foo");
|
||||
synchronized(called) {
|
||||
|
@ -330,6 +371,53 @@ public class GroupMapTest extends TestCase {
|
|||
}
|
||||
}
|
||||
assertTrue(map1.isEmpty());
|
||||
|
||||
map1.stop();
|
||||
map2.stop();
|
||||
}
|
||||
|
||||
public void testExpire() throws Exception{
|
||||
final AtomicBoolean called1 = new AtomicBoolean();
|
||||
final AtomicBoolean called2 = new AtomicBoolean();
|
||||
|
||||
GroupMap map1 = new GroupMap(connection1,"map1");
|
||||
map1.setTimeToLive(1000);
|
||||
map1.addMapChangedListener(new DefaultMapChangedListener() {
|
||||
public void mapRemove(Member owner, Object key, Object value,boolean expired) {
|
||||
synchronized(called1) {
|
||||
called1.set(expired);
|
||||
called1.notifyAll();
|
||||
}
|
||||
}
|
||||
});
|
||||
map1.start();
|
||||
|
||||
GroupMap map2 = new GroupMap(connection2,"map2");
|
||||
|
||||
map2.addMapChangedListener(new DefaultMapChangedListener() {
|
||||
public void mapRemove(Member owner, Object key, Object value,boolean expired) {
|
||||
synchronized(called2) {
|
||||
called2.set(expired);
|
||||
called2.notifyAll();
|
||||
}
|
||||
}
|
||||
});
|
||||
map2.start();
|
||||
|
||||
|
||||
map1.put("test", "blob");
|
||||
synchronized(called1) {
|
||||
if (!called1.get()) {
|
||||
called1.wait(5000);
|
||||
}
|
||||
}
|
||||
synchronized(called2) {
|
||||
if (!called2.get()) {
|
||||
called2.wait(5000);
|
||||
}
|
||||
}
|
||||
assertTrue(called1.get());
|
||||
assertTrue(called2.get());
|
||||
map1.stop();
|
||||
map2.stop();
|
||||
}
|
||||
|
@ -339,14 +427,17 @@ public class GroupMapTest extends TestCase {
|
|||
broker = createBroker();
|
||||
}
|
||||
ConnectionFactory factory = createConnectionFactory();
|
||||
connection = factory.createConnection();
|
||||
connection.start();
|
||||
connection1 = factory.createConnection();
|
||||
connection1.start();
|
||||
connection2 = factory.createConnection();
|
||||
connection2.start();
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
connection.close();
|
||||
connection1.close();
|
||||
connection2.close();
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue