added groups

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@702452 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-10-07 12:31:59 +00:00
parent f8071a34a4
commit 0706a4ce7c
25 changed files with 4031 additions and 9 deletions

View File

@ -449,9 +449,11 @@
<exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude> <exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
<exclude>**/amq1490/*</exclude> <exclude>**/amq1490/*</exclude>
<exclude>**/AMQ1925/*</exclude>
<exclude>**/archive/*</exclude> <exclude>**/archive/*</exclude>
<exclude>**/NetworkFailoverTest.*/**</exclude> <exclude>**/NetworkFailoverTest.*/**</exclude>
<exclude>**/AMQDeadlockTest3.*</exclude> <exclude>**/AMQDeadlockTest3.*</exclude>
</excludes> </excludes>
</configuration> </configuration>

View File

@ -30,8 +30,8 @@ public class AMQStoreDurableTopicTest extends SimpleDurableTopicTest {
dataFileDir.mkdirs(); dataFileDir.mkdirs();
answer.setDeleteAllMessagesOnStartup(true); answer.setDeleteAllMessagesOnStartup(true);
AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter(); AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
adaptor.setArchiveDataLogs(true); //adaptor.setArchiveDataLogs(true);
adaptor.setMaxFileLength(1024 * 64); //adaptor.setMaxFileLength(1024 * 64);
answer.setDataDirectoryFile(dataFileDir); answer.setDataDirectoryFile(dataFileDir);
answer.setPersistenceAdapter(adaptor); answer.setPersistenceAdapter(adaptor);

View File

@ -30,8 +30,8 @@ import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
public class SimpleDurableTopicTest extends SimpleTopicTest { public class SimpleDurableTopicTest extends SimpleTopicTest {
protected void setUp() throws Exception { protected void setUp() throws Exception {
numberOfDestinations=1; numberOfDestinations=10;
numberOfConsumers = 2; numberOfConsumers = 10;
numberofProducers = 2; numberofProducers = 2;
sampleCount=1000; sampleCount=1000;
playloadSize = 1024; playloadSize = 1024;
@ -44,7 +44,7 @@ public class SimpleDurableTopicTest extends SimpleTopicTest {
persistenceFactory.setPersistentIndex(true); persistenceFactory.setPersistentIndex(true);
persistenceFactory.setCleanupInterval(10000); persistenceFactory.setCleanupInterval(10000);
answer.setPersistenceFactory(persistenceFactory); answer.setPersistenceFactory(persistenceFactory);
answer.setDeleteAllMessagesOnStartup(true); //answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector(uri); answer.addConnector(uri);
answer.setUseShutdownHook(false); answer.setUseShutdownHook(false);
} }
@ -63,7 +63,7 @@ public class SimpleDurableTopicTest extends SimpleTopicTest {
protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception { protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception {
ActiveMQConnectionFactory result = super.createConnectionFactory(uri); ActiveMQConnectionFactory result = super.createConnectionFactory(uri);
result.setSendAcksAsync(false); //result.setSendAcksAsync(false);
return result; return result;
} }

View File

@ -60,7 +60,7 @@ public class AMQ1925Test extends TestCase {
private URI tcpUri; private URI tcpUri;
private ActiveMQConnectionFactory cf; private ActiveMQConnectionFactory cf;
public void testAMQ1925_TXInProgress() throws Exception { public void XtestAMQ1925_TXInProgress() throws Exception {
Connection connection = cf.createConnection(); Connection connection = cf.createConnection();
connection.start(); connection.start();
Session session = connection.createSession(true, Session session = connection.createSession(true,
@ -372,8 +372,8 @@ public class AMQ1925Test extends TestCase {
protected void setUp() throws Exception { protected void setUp() throws Exception {
bs = new BrokerService(); bs = new BrokerService();
bs.setDeleteAllMessagesOnStartup(true);
bs.setPersistent(true); bs.setPersistent(true);
bs.deleteAllMessages();
bs.setUseJmx(true); bs.setUseJmx(true);
TransportConnector connector = bs.addConnector("tcp://localhost:0"); TransportConnector connector = bs.addConnector("tcp://localhost:0");
bs.start(); bs.start();

View File

@ -18,7 +18,7 @@
# #
# The logging properties used during tests.. # The logging properties used during tests..
# #
log4j.rootLogger=INFO, out log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.spring=WARN log4j.logger.org.apache.activemq.spring=WARN

93
activemq-groups/pom.xml Executable file
View File

@ -0,0 +1,93 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.3-SNAPSHOT</version>
</parent>
<artifactId>activemq-groups</artifactId>
<packaging>bundle</packaging>
<name>ActiveMQ :: Groups</name>
<description>A JMS based collaboration framework</description>
<properties>
<activemq.osgi.import.pkg>*</activemq.osgi.import.pkg>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.3</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
<version>2.5.4</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.5</source>
<target>1.5</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups;
/**
* Default implementation of a MapChangedListener
*
*/
public class DefaultMapChangedListener implements GroupStateChangedListener{
public void mapInsert(Member owner, Object key, Object value) {
}
public void mapRemove(Member owner, Object key, Object value,boolean expired) {
}
public void mapUpdate(Member owner, Object key, Object oldValue,Object newValue) {
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups;
/**
* A listener for message communication
*
*/
public interface GroupMessageListener {
/**
* Called when a message is delivered to the <CODE>Group</CODE> from another member
* @param sender the member who sent the message
* @param replyId the id to use to respond to a message
* @param message the message object
*/
void messageDelivered(Member sender, String replyId, Object message);
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups;
/**
*Get notifications about changes to the state of the map
*
*/
public interface GroupStateChangedListener {
/**
* Called when a key/value pair is inserted into the map
* @param owner
* @param key
* @param value
*/
void mapInsert(Member owner,Object key, Object value);
/**
* Called when a key value is updated in the map
* @param owner
* @param Key
* @param oldValue
* @param newValue
*/
void mapUpdate(Member owner,Object Key,Object oldValue,Object newValue);
/**
* Called when a key value is removed from the map
* @param owner
* @param key
* @param value
* @param expired
*/
void mapRemove(Member owner,Object key, Object value,boolean expired);
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups;
/**
* thrown when updating a key to map that the local client doesn't own
*
*/
public class GroupUpdateException extends RuntimeException {
private static final long serialVersionUID = -7584658017201604560L;
/**
* @param message
*/
public GroupUpdateException(String message) {
super(message);
}
}

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import javax.jms.Destination;
import org.apache.activemq.util.IdGenerator;
/**
*<P>
* A <CODE>Member</CODE> holds information about a member of the group
*
*/
public class Member implements Externalizable {
private String name;
private String id;
private String hostname;
private long startTime;
private int coordinatorWeight;
private Destination inBoxDestination;
private transient long timeStamp;
/**
* Default constructor - only used by serialization
*/
public Member() {
}
/**
* @param name
*/
public Member(String name) {
this.name = name;
this.hostname = IdGenerator.getHostName();
this.startTime=System.currentTimeMillis();
}
/**
* @return the name
*/
public String getName() {
return this.name;
}
/**
* @return the id
*/
public String getId() {
return this.id;
}
void setId(String id) {
this.id=id;
}
/**
* @return the hostname
*/
public String getHostname() {
return this.hostname;
}
/**
* @return the startTime
*/
public long getStartTime() {
return this.startTime;
}
/**
* @return the inbox destination
*/
public Destination getInBoxDestination() {
return this.inBoxDestination;
}
void setInBoxDestination(Destination dest) {
this.inBoxDestination=dest;
}
/**
* @return the timeStamp
*/
long getTimeStamp() {
return this.timeStamp;
}
/**
* @param timeStamp the timeStamp to set
*/
void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
/**
* @return the coordinatorWeight
*/
public int getCoordinatorWeight() {
return this.coordinatorWeight;
}
/**
* @param coordinatorWeight the coordinatorWeight to set
*/
public void setCoordinatorWeight(int coordinatorWeight) {
this.coordinatorWeight = coordinatorWeight;
}
public String toString() {
return this.name+"["+this.id+"]@"+this.hostname;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.coordinatorWeight=in.readInt();;
this.name = in.readUTF();
this.id = in.readUTF();
this.hostname = in.readUTF();
this.startTime=in.readLong();
this.inBoxDestination=(Destination) in.readObject();
this.coordinatorWeight=in.readInt();
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(this.coordinatorWeight);
out.writeUTF(this.name != null ? this.name : "");
out.writeUTF(this.id != null ? this.id : "");
out.writeUTF(this.hostname != null ? this.hostname : "");
out.writeLong(this.startTime);
out.writeObject(this.inBoxDestination);
out.writeInt(this.coordinatorWeight);
}
public int hashCode() {
return this.id.hashCode();
}
public boolean equals(Object obj) {
boolean result = false;
if (obj instanceof Member) {
Member other = (Member)obj;
result = this.id.equals(other.id);
}
return result;
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups;
/**
* A listener for membership changes to a group
*
*/
public interface MemberChangedListener {
/**
* Notification a member has started
* @param member
*/
void memberStarted(Member member);
/**
* Notification a member has stopped
* @param member
*/
void memberStopped(Member member);
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups.command;
import java.util.HashSet;
import java.util.Set;
/**
* Return information about map update
*
*/
public class AsyncMapRequest implements RequestCallback{
private final Object mutex = new Object();
private Set<String> requests = new HashSet<String>();
public void add(String id, MapRequest request) {
request.setCallback(this);
this.requests.add(id);
}
/**
* Wait for requests
* @param timeout
* @return
*/
public boolean isSuccess(long timeout) {
long deadline = System.currentTimeMillis() + timeout;
while (!this.requests.isEmpty()) {
synchronized (this.mutex) {
try {
this.mutex.wait(timeout);
} catch (InterruptedException e) {
break;
}
}
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
return this.requests.isEmpty();
}
public void finished(String id) {
this.requests.remove(id);
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups.command;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.apache.activegroups.Member;
/**
* Used to pass information around
*
*/
public class ElectionMessage implements Externalizable{
public static enum MessageType{ELECTION,ANSWER,COORDINATOR};
private Member member;
private MessageType type;
/**
* @return the member
*/
public Member getMember() {
return this.member;
}
/**
* @param member the member to set
*/
public void setMember(Member member) {
this.member = member;
}
/**
* @return the type
*/
public MessageType getType() {
return this.type;
}
/**
* @param type the type to set
*/
public void setType(MessageType type) {
this.type = type;
}
/**
* @return true if election message
*/
public boolean isElection() {
return this.type != null && this.type.equals(MessageType.ELECTION);
}
/**
* @return true if answer message
*/
public boolean isAnswer() {
return this.type != null && this.type.equals(MessageType.ANSWER);
}
/**
* @return true if coordinator message
*/
public boolean isCoordinator() {
return this.type != null && this.type.equals(MessageType.COORDINATOR);
}
public ElectionMessage copy() {
ElectionMessage result = new ElectionMessage();
result.member=this.member;
result.type=this.type;
return result;
}
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
this.member=(Member) in.readObject();
this.type=(MessageType) in.readObject();
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(this.member);
out.writeObject(this.type);
}
public String toString() {
return "ElectionMessage: "+ this.member + "{"+this.type+ "}";
}
}

View File

@ -0,0 +1,210 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups.command;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.apache.activegroups.Member;
/**
* Holds information about an EntryKey
*
*/
public class EntryKey<K> implements Externalizable {
private Member owner;
private K key;
private boolean locked;
private boolean removeOnExit;
private boolean releaseLockOnExit;
private long expiration;
private long lockExpiration;
/**
* Default constructor - for serialization
*/
public EntryKey() {
}
public EntryKey(Member owner, K key) {
this.owner = owner;
this.key = key;
}
public int hashCode() {
return this.key != null ? this.key.hashCode() : super.hashCode();
}
/**
* @return the owner
*/
public Member getOwner() {
return this.owner;
}
public void setOwner(Member member) {
this.owner=member;
}
/**
* @return the key
*/
public K getKey() {
return this.key;
}
/**
* @return the share
*/
public boolean isLocked() {
return this.locked;
}
/**
* @param share the share to set
*/
public void setLocked(boolean locked) {
this.locked = locked;
}
/**
* @return the removeOnExit
*/
public boolean isRemoveOnExit() {
return this.removeOnExit;
}
/**
* @param removeOnExit
* the removeOnExit to set
*/
public void setRemoveOnExit(boolean removeOnExit) {
this.removeOnExit = removeOnExit;
}
/**
* @return the expiration
*/
public long getExpiration() {
return expiration;
}
/**
* @param expiration the expiration to set
*/
public void setExpiration(long expiration) {
this.expiration = expiration;
}
/**
* @return the lockExpiration
*/
public long getLockExpiration() {
return lockExpiration;
}
/**
* @param lockExpiration the lockExpiration to set
*/
public void setLockExpiration(long lockExpiration) {
this.lockExpiration = lockExpiration;
}
/**
* @return the releaseLockOnExit
*/
public boolean isReleaseLockOnExit() {
return releaseLockOnExit;
}
/**
* @param releaseLockOnExit the releaseLockOnExit to set
*/
public void setReleaseLockOnExit(boolean releaseLockOnExit) {
this.releaseLockOnExit = releaseLockOnExit;
}
public void setTimeToLive(long ttl) {
if (ttl > 0 ) {
this.expiration=ttl+System.currentTimeMillis();
}else {
this.expiration =0l;
}
}
public void setLockLeaseTime(long ttl) {
if(ttl > 0) {
this.lockExpiration=ttl+System.currentTimeMillis();
}else {
this.lockExpiration=0l;
}
}
public boolean isExpired() {
return isExpired(System.currentTimeMillis());
}
public boolean isExpired(long currentTime) {
return this.expiration > 0 && this.expiration < currentTime;
}
public boolean isLockExpired() {
return isLockExpired(System.currentTimeMillis());
}
public boolean isLockExpired(long currentTime) {
return this.lockExpiration > 0 && this.lockExpiration < currentTime;
}
public boolean equals(Object obj) {
boolean result = false;
if (obj instanceof EntryKey) {
EntryKey other = (EntryKey) obj;
result = other.key.equals(this.key);
}
return result;
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(this.owner);
out.writeObject(this.key);
out.writeBoolean(isLocked());
out.writeBoolean(isRemoveOnExit());
out.writeBoolean(isReleaseLockOnExit());
out.writeLong(getExpiration());
out.writeLong(getLockExpiration());
}
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
this.owner = (Member) in.readObject();
this.key = (K) in.readObject();
this.locked = in.readBoolean();
this.removeOnExit=in.readBoolean();
this.releaseLockOnExit=in.readBoolean();
this.expiration=in.readLong();
this.lockExpiration=in.readLong();
}
public String toString() {
return "key:"+this.key;
}
}

View File

@ -0,0 +1,204 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups.command;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* Used to pass information around
*
*/
public class EntryMessage implements Externalizable{
public static enum MessageType{INSERT,DELETE,SYNC};
private EntryKey key;
private Object value;
private Object oldValue;
private MessageType type;
private boolean mapUpdate;
private boolean expired;
private boolean lockExpired;
private boolean lockUpdate;
/**
* @return the owner
*/
public EntryKey getKey() {
return this.key;
}
/**
* @param key
*/
public void setKey(EntryKey key) {
this.key = key;
}
/**
* @return the value
*/
public Object getValue() {
return this.value;
}
/**
* @param value the value to set
*/
public void setValue(Object value) {
this.value = value;
}
/**
* @return the oldValue
*/
public Object getOldValue() {
return this.oldValue;
}
/**
* @param oldValue the oldValue to set
*/
public void setOldValue(Object oldValue) {
this.oldValue = oldValue;
}
/**
* @return the type
*/
public MessageType getType() {
return this.type;
}
/**
* @param type the type to set
*/
public void setType(MessageType type) {
this.type = type;
}
/**
* @return the mapUpdate
*/
public boolean isMapUpdate() {
return this.mapUpdate;
}
/**
* @param mapUpdate the mapUpdate to set
*/
public void setMapUpdate(boolean mapUpdate) {
this.mapUpdate = mapUpdate;
}
/**
* @return the expired
*/
public boolean isExpired() {
return expired;
}
/**
* @param expired the expired to set
*/
public void setExpired(boolean expired) {
this.expired = expired;
}
/**
* @return the lockExpired
*/
public boolean isLockExpired() {
return lockExpired;
}
/**
* @param lockExpired the lockExpired to set
*/
public void setLockExpired(boolean lockExpired) {
this.lockExpired = lockExpired;
}
/**
* @return the lockUpdate
*/
public boolean isLockUpdate() {
return lockUpdate;
}
/**
* @param lockUpdate the lockUpdate to set
*/
public void setLockUpdate(boolean lockUpdate) {
this.lockUpdate = lockUpdate;
}
/**
* @return if insert message
*/
public boolean isInsert() {
return this.type != null && this.type.equals(MessageType.INSERT);
}
/**
* @return true if delete message
*/
public boolean isDelete() {
return this.type != null && this.type.equals(MessageType.DELETE);
}
public boolean isSync() {
return this.type != null && this.type.equals(MessageType.SYNC);
}
public EntryMessage copy() {
EntryMessage result = new EntryMessage();
result.key=this.key;
result.value=this.value;
result.oldValue=this.oldValue;
result.type=this.type;
result.mapUpdate=this.mapUpdate;
result.expired=this.expired;
result.lockExpired=this.lockExpired;
result.lockUpdate=this.lockUpdate;
return result;
}
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
this.key=(EntryKey) in.readObject();
this.value=in.readObject();
this.oldValue=in.readObject();
this.type=(MessageType) in.readObject();
this.mapUpdate=in.readBoolean();
this.expired=in.readBoolean();
this.lockExpired=in.readBoolean();
this.lockUpdate=in.readBoolean();
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(this.key);
out.writeObject(this.value);
out.writeObject(this.oldValue);
out.writeObject(this.type);
out.writeBoolean(this.mapUpdate);
out.writeBoolean(this.expired);
out.writeBoolean(this.lockExpired);
out.writeBoolean(this.lockUpdate);
}
public String toString() {
return "EntryMessage: "+this.type + "[" + this.key + "," + this.value +
"]{update=" + this.mapUpdate + "}";
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY VIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups.command;
/**
* Holds information about the Value in the Map
*
*/
public class EntryValue<V> {
private EntryKey key;
private V value;
public EntryValue(EntryKey key, V value){
this.key=key;
this.value=value;
}
/**
* @return the owner
*/
public EntryKey getKey() {
return this.key;
}
/**
* @return the key
*/
public V getValue() {
return this.value;
}
/**
* set the value
* @param value
*/
public void setValue(V value) {
this.value=value;
}
public int hashCode() {
return this.value != null ? this.value.hashCode() : super.hashCode();
}
public boolean equals(Object obj) {
boolean result = false;
if (obj instanceof EntryValue) {
EntryValue other = (EntryValue)obj;
result = (this.value==null && other.value==null) ||
(this.value != null && other.value != null && this.value.equals(other.value));
}
return result;
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups.command;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Return information about map update
*
*/
public class MapRequest {
private static final Log LOG = LogFactory.getLog(MapRequest.class);
private final AtomicBoolean done = new AtomicBoolean();
private Object response;
private RequestCallback callback;
public Object get(long timeout) {
synchronized (this.done) {
if (this.done.get() == false && this.response == null) {
try {
this.done.wait(timeout);
} catch (InterruptedException e) {
LOG.warn("Interrupted in get("+timeout+")",e);
}
}
}
return this.response;
}
public void put(String id,Object response) {
this.response = response;
cancel();
RequestCallback callback = this.callback;
if (callback != null) {
callback.finished(id);
}
}
public void cancel() {
this.done.set(true);
synchronized (this.done) {
this.done.notifyAll();
}
}
public void setCallback(RequestCallback callback) {
this.callback=callback;
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups.command;
/**
* Return information about map update
*
*/
public interface RequestCallback{
/**
* Optionally called when a request is finished
* @param id
*/
void finished(String id);
}

View File

@ -0,0 +1,25 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
Shared state, messaging and membership information between members of a distributed group
</body>
</html>

View File

@ -0,0 +1,37 @@
## ------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ------------------------------------------------------------------------
#
# The logging properties used for eclipse testing, We want to see debug output on the console.
#
log4j.rootLogger=INFO, out
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
# File appender
log4j.appender.fout=org.apache.log4j.FileAppender
log4j.appender.fout.layout=org.apache.log4j.PatternLayout
log4j.appender.fout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
log4j.appender.fout.file=target/amq-testlog.log
log4j.appender.fout.append=true

View File

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups;
import java.util.ArrayList;
import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
public class GroupMemberTest extends TestCase {
protected BrokerService broker;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
public void testCoordinatorSelection() throws Exception{
Group group = new Group(null,"");
List<Member>list = new ArrayList<Member>();
final int number =10;
Member choosen = null;
for (int i =0;i< number;i++) {
Member m = new Member("group"+i);
m.setId(""+i);
if (number/2==i) {
m.setCoordinatorWeight(10);
choosen=m;
}
list.add(m);
}
Member c = group.selectCordinator(list);
assertEquals(c,choosen);
}
/**
* Test method for
* {@link org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
* @throws Exception
*/
public void testGroup() throws Exception {
final int number = 10;
List<Group>groupMaps = new ArrayList<Group>();
ConnectionFactory factory = createConnectionFactory();
for (int i =0; i < number; i++) {
Connection connection = factory.createConnection();
Group map = new Group(connection,"map"+i);
map.setHeartBeatInterval(200);
map.setMinimumGroupSize(i+1);
map.start();
groupMaps.add(map);
}
int coordinatorNumber = 0;
for (Group map:groupMaps) {
if (map.isCoordinator()) {
coordinatorNumber++;
}
}
for(Group map:groupMaps) {
map.stop();
}
}
public void testWeightedGroup() throws Exception {
final int number = 10;
List<Group>groupMaps = new ArrayList<Group>();
Group last = null;
ConnectionFactory factory = createConnectionFactory();
for (int i =0; i < number; i++) {
Connection connection = factory.createConnection();
Group map = new Group(connection,"map"+i);
if(i ==number/2) {
map.setCoordinatorWeight(10);
last=map;
}
map.setMinimumGroupSize(i+1);
map.start();
groupMaps.add(map);
}
Thread.sleep(2000);
int coordinator = 0;
Group groupCoordinator = null;
for (Group map:groupMaps) {
if (map.isCoordinator()) {
coordinator++;
groupCoordinator=map;
}
}
assertNotNull(groupCoordinator);
assertEquals(1,coordinator);
assertEquals(last.getName(),groupCoordinator.getName());
for(Group map:groupMaps) {
map.stop();
}
}
protected void setUp() throws Exception {
if (broker == null) {
broker = createBroker();
}
super.setUp();
}
protected void tearDown() throws Exception {
super.tearDown();
if (broker != null) {
broker.stop();
}
}
protected ActiveMQConnectionFactory createConnectionFactory()throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
return cf;
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
configureBroker(answer);
answer.start();
return answer;
}
protected void configureBroker(BrokerService answer) throws Exception {
answer.setPersistent(false);
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
}

View File

@ -0,0 +1,230 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
public class GroupMessageTest extends TestCase {
protected BrokerService broker;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
public void testGroupBroadcast() throws Exception {
final int number = 10;
final AtomicInteger count = new AtomicInteger();
List<Group> groups = new ArrayList<Group>();
ConnectionFactory factory = createConnectionFactory();
for (int i = 0; i < number; i++) {
Connection connection = factory.createConnection();
Group group = new Group(connection, "group" + i);
group.setMinimumGroupSize(i+1);
group.start();
groups.add(group);
group.addGroupMessageListener(new GroupMessageListener() {
public void messageDelivered(Member sender, String replyId,
Object message) {
synchronized (count) {
if (count.incrementAndGet() == number) {
count.notifyAll();
}
}
}
});
}
groups.get(0).broadcastMessage("hello");
synchronized (count) {
if (count.get() < number) {
count.wait(5000);
}
}
assertEquals(number, count.get());
for (Group map : groups) {
map.stop();
}
}
public void testsendMessage() throws Exception {
final int number = 10;
final AtomicInteger count = new AtomicInteger();
List<Group> groups = new ArrayList<Group>();
ConnectionFactory factory = createConnectionFactory();
for (int i = 0; i < number; i++) {
Connection connection = factory.createConnection();
Group group = new Group(connection, "group" + i);
group.setMinimumGroupSize(i+1);
group.start();
groups.add(group);
group.addGroupMessageListener(new GroupMessageListener() {
public void messageDelivered(Member sender, String replyId,
Object message) {
synchronized (count) {
count.incrementAndGet();
count.notifyAll();
}
}
});
}
groups.get(0).sendMessage("hello");
synchronized (count) {
if (count.get() == 0) {
count.wait(5000);
}
}
// wait a while to check that only one got it
Thread.sleep(2000);
assertEquals(1, count.get());
for (Group map : groups) {
map.stop();
}
}
public void testSendToSingleMember() throws Exception {
ConnectionFactory factory = createConnectionFactory();
Connection connection1 = factory.createConnection();
Connection connection2 = factory.createConnection();
Group group1 = new Group(connection1, "group1");
final AtomicBoolean called = new AtomicBoolean();
group1.addGroupMessageListener(new GroupMessageListener() {
public void messageDelivered(Member sender, String replyId,
Object message) {
synchronized (called) {
called.set(true);
called.notifyAll();
}
}
});
group1.start();
Group group2 = new Group(connection2, "group2");
group2.setMinimumGroupSize(2);
group2.start();
Member member1 = group2.getMemberByName("group1");
group2.sendMessage(member1, "hello");
synchronized (called) {
if (!called.get()) {
called.wait(5000);
}
}
assertTrue(called.get());
group1.stop();
group2.stop();
}
public void testSendRequestReply() throws Exception {
ConnectionFactory factory = createConnectionFactory();
Connection connection1 = factory.createConnection();
Connection connection2 = factory.createConnection();
final int number = 1000;
final AtomicInteger requestCount = new AtomicInteger();
final AtomicInteger replyCount = new AtomicInteger();
final List<String> requests = new ArrayList<String>();
final List<String> replies = new ArrayList<String>();
for (int i = 0; i < number; i++) {
requests.add("request" + i);
replies.add("reply" + i);
}
final Group group1 = new Group(connection1, "group1");
final AtomicBoolean finished = new AtomicBoolean();
group1.addGroupMessageListener(new GroupMessageListener() {
public void messageDelivered(Member sender, String replyId,
Object message) {
if (!replies.isEmpty()) {
String reply = replies.remove(0);
try {
group1.sendMessageResponse(sender, replyId, reply);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
group1.start();
final Group group2 = new Group(connection2, "group2");
group2.setMinimumGroupSize(2);
group2.addGroupMessageListener(new GroupMessageListener() {
public void messageDelivered(Member sender, String replyId,
Object message) {
if (!requests.isEmpty()) {
String request = requests.remove(0);
try {
group2.sendMessage(sender, request);
} catch (JMSException e) {
e.printStackTrace();
}
}else {
synchronized (finished) {
finished.set(true);
finished.notifyAll();
}
}
}
});
group2.start();
Member member1 = group2.getMemberByName("group1");
group2.sendMessage(member1, requests.remove(0));
synchronized (finished) {
if (!finished.get()) {
finished.wait(10000);
}
}
assertTrue(finished.get());
group1.stop();
group2.stop();
}
protected void setUp() throws Exception {
if (broker == null) {
broker = createBroker();
}
super.setUp();
}
protected void tearDown() throws Exception {
super.tearDown();
if (broker != null) {
broker.stop();
}
}
protected ActiveMQConnectionFactory createConnectionFactory()
throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
return cf;
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
configureBroker(answer);
answer.start();
return answer;
}
protected void configureBroker(BrokerService answer) throws Exception {
answer.setPersistent(false);
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
}

View File

@ -0,0 +1,548 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
public class GroupStateTest extends TestCase {
protected BrokerService broker;
protected Connection connection1;
protected Connection connection2;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
/**
* Test method for
* {@link org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
* @throws Exception
*/
public void testAddMemberChangedListener() throws Exception {
final AtomicInteger counter = new AtomicInteger();
Group map1 = new Group(connection1,"map1");
map1.addMemberChangedListener(new MemberChangedListener(){
public void memberStarted(Member member) {
synchronized(counter) {
counter.incrementAndGet();
counter.notifyAll();
}
}
public void memberStopped(Member member) {
synchronized(counter) {
counter.decrementAndGet();
counter.notifyAll();
}
}
});
map1.start();
synchronized(counter) {
if (counter.get()<1) {
counter.wait(5000);
}
}
assertEquals(1, counter.get());
Group map2 = new Group(connection2,"map2");
map2.start();
synchronized(counter) {
if (counter.get()<2) {
counter.wait(5000);
}
}
assertEquals(2, counter.get());
map2.stop();
synchronized(counter) {
if (counter.get()>=2) {
counter.wait(Group.DEFAULT_HEART_BEAT_INTERVAL*3);
}
}
assertEquals(1, counter.get());
map1.stop();
}
/**
* Test method for
* {@link org.apache.activemq.group.Group#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}.
* @throws Exception
*/
public void testAddMapChangedListener() throws Exception {
final AtomicBoolean called1 = new AtomicBoolean();
final AtomicBoolean called2 = new AtomicBoolean();
Group map1 = new Group(connection1,"map1");
map1.addMapChangedListener(new DefaultMapChangedListener() {
public void mapInsert(Member owner,Object Key, Object Value) {
synchronized(called1) {
called1.set(true);
called1.notifyAll();
}
}
});
map1.start();
Group map2 = new Group(connection2,"map2");
map2.addMapChangedListener(new DefaultMapChangedListener() {
public void mapInsert(Member owner,Object Key, Object Value) {
synchronized(called2) {
called2.set(true);
called2.notifyAll();
}
}
});
map2.start();
map1.put("test", "blob");
synchronized(called1) {
if (!called1.get()) {
called1.wait(5000);
}
}
synchronized(called2) {
if (!called2.get()) {
called2.wait(5000);
}
}
assertTrue(called1.get());
assertTrue(called2.get());
map1.stop();
map2.stop();
}
public void testGetImplicitWriteLock() throws Exception {
Group map1 = new Group(connection1, "map1");
final AtomicBoolean called = new AtomicBoolean();
map1.start();
Group map2 = new Group(connection2, "map2");
map2.setAlwaysLock(true);
map2.setMinimumGroupSize(2);
map2.start();
map2.put("test", "foo");
try {
map1.put("test", "bah");
fail("Should have thrown an exception!");
} catch (GroupUpdateException e) {
}
map1.stop();
map2.stop();
}
public void testExpireImplicitWriteLock() throws Exception {
Group map1 = new Group(connection1, "map1");
final AtomicBoolean called = new AtomicBoolean();
map1.start();
Group map2 = new Group(connection2, "map2");
map2.setAlwaysLock(true);
map2.setLockTimeToLive(1000);
map2.setMinimumGroupSize(2);
map2.start();
map2.put("test", "foo");
try {
map1.put("test", "bah");
fail("Should have thrown an exception!");
} catch (GroupUpdateException e) {
}
Thread.sleep(2000);
map1.put("test", "bah");
map1.stop();
map2.stop();
}
public void XtestExpireImplicitLockOnExit() throws Exception {
Group map1 = new Group(connection1, "map1");
final AtomicBoolean called = new AtomicBoolean();
map1.start();
Group map2 = new Group(connection2, "map2");
map2.setAlwaysLock(true);
map2.setMinimumGroupSize(2);
map2.start();
map2.put("test", "foo");
try {
map1.put("test", "bah");
fail("Should have thrown an exception!");
} catch (GroupUpdateException e) {
}
map2.stop();
map1.put("test", "bah");
map1.stop();
}
public void testGetExplicitWriteLock() throws Exception {
Group map1 = new Group(connection1, "map1");
map1.setAlwaysLock(true);
final AtomicBoolean called = new AtomicBoolean();
map1.start();
Group map2 = new Group(connection2, "map2");
map2.setAlwaysLock(true);
map2.setMinimumGroupSize(2);
map2.start();
map2.put("test", "foo");
map2.lock("test");
try {
map1.put("test", "bah");
fail("Should have thrown an exception!");
} catch (GroupUpdateException e) {
}
map2.unlock("test");
map1.lock("test");
try {
map2.lock("test");
fail("Should have thrown an exception!");
} catch (GroupUpdateException e) {
}
map1.stop();
map2.stop();
}
/**
* Test method for {@link org.apache.activemq.group.Group#clear()}.
*
* @throws Exception
*/
public void testClear() throws Exception {
Group map1 = new Group(connection1,"map1");
final AtomicBoolean called = new AtomicBoolean();
map1.addMapChangedListener(new DefaultMapChangedListener() {
public void mapInsert(Member owner,Object Key, Object Value) {
synchronized(called) {
called.set(true);
called.notifyAll();
}
}
public void mapRemove(Member owner, Object key, Object value,boolean expired) {
synchronized(called) {
called.set(true);
called.notifyAll();
}
}
});
map1.start();
Group map2 = new Group(connection2,"map2");
map2.start();
map2.put("test","foo");
synchronized(called) {
if (!called.get()) {
called.wait(5000);
}
}
assertTrue(called.get());
called.set(false);
assertTrue(map1.isEmpty()==false);
map2.clear();
synchronized(called) {
if (!called.get()) {
called.wait(5000);
}
}
assertTrue(map1.isEmpty());
map1.stop();
map2.stop();
}
/**
* Test a new map is populated for existing values
*/
public void testMapUpdatedOnStart() throws Exception {
Group map1 = new Group(connection1,"map1");
final AtomicBoolean called = new AtomicBoolean();
map1.start();
map1.put("test", "foo");
Group map2 = new Group(connection2,"map2");
map2.addMapChangedListener(new DefaultMapChangedListener() {
public void mapInsert(Member owner,Object Key, Object Value) {
synchronized(called) {
called.set(true);
called.notifyAll();
}
}
});
map2.start();
synchronized(called) {
if (!called.get()) {
called.wait(5000);
}
}
assertTrue(called.get());
called.set(false);
assertTrue(map2.containsKey("test"));
assertTrue(map2.containsValue("foo"));
map1.stop();
map2.stop();
}
public void testContainsKey() throws Exception {
Group map1 = new Group(connection1,"map1");
final AtomicBoolean called = new AtomicBoolean();
map1.addMapChangedListener(new DefaultMapChangedListener() {
public void mapInsert(Member owner,Object Key, Object Value) {
synchronized(called) {
called.set(true);
called.notifyAll();
}
}
});
map1.start();
Group map2 = new Group(connection2,"map2");
map2.start();
map2.put("test","foo");
synchronized(called) {
if (!called.get()) {
called.wait(5000);
}
}
assertTrue(called.get());
called.set(false);
assertTrue(map1.containsKey("test"));
map1.stop();
map2.stop();
}
/**
* Test method for
* {@link org.apache.activemq.group.Group#containsValue(java.lang.Object)}.
* @throws Exception
*/
public void testContainsValue() throws Exception {
Group map1 = new Group(connection1,"map1");
final AtomicBoolean called = new AtomicBoolean();
map1.addMapChangedListener(new DefaultMapChangedListener() {
public void mapInsert(Member owner,Object Key, Object Value) {
synchronized(called) {
called.set(true);
called.notifyAll();
}
}
});
map1.start();
Group map2 = new Group(connection2,"map2");
map2.start();
map2.put("test","foo");
synchronized(called) {
if (!called.get()) {
called.wait(5000);
}
}
assertTrue(called.get());
called.set(false);
assertTrue(map1.containsValue("foo"));
map1.stop();
map2.stop();
}
/**
* Test method for {@link org.apache.activemq.group.GroupMap#entrySet()}.
* @throws Exception
*/
/**
* Test method for
* {@link org.apache.activemq.group.Group#get(java.lang.Object)}.
* @throws Exception
*/
public void testGet() throws Exception {
Group map1 = new Group(connection1,"map1");
final AtomicBoolean called = new AtomicBoolean();
map1.addMapChangedListener(new DefaultMapChangedListener() {
public void mapInsert(Member owner,Object Key, Object Value) {
synchronized(called) {
called.set(true);
called.notifyAll();
}
}
});
map1.start();
Group map2 = new Group(connection2,"map2");
map2.start();
map2.put("test","foo");
synchronized(called) {
if (!called.get()) {
called.wait(5000);
}
}
assertTrue(called.get());
assertTrue(map1.get("test").equals("foo"));
map1.stop();
map2.stop();
}
public void testPut() throws Exception {
Group map1 = new Group(connection1,"map1");
map1.start();
Group map2 = new Group(connection2,"map2");
map2.setMinimumGroupSize(2);
map2.start();
Object value = map1.put("foo", "blob");
assertNull(value);
value = map1.put("foo", "blah");
assertEquals(value, "blob");
map1.stop();
map2.stop();
}
/**
* Test method for
* {@link org.apache.activemq.group.Group#remove(java.lang.Object)}.
*/
public void testRemove() throws Exception{
Group map1 = new Group(connection1,"map1");
final AtomicBoolean called = new AtomicBoolean();
map1.addMapChangedListener(new DefaultMapChangedListener() {
public void mapInsert(Member owner,Object Key, Object Value) {
synchronized(called) {
called.set(true);
called.notifyAll();
}
}
public void mapRemove(Member owner, Object key, Object value,boolean expired) {
synchronized(called) {
called.set(true);
called.notifyAll();
}
}
});
map1.start();
Group map2 = new Group(connection2,"map2");
map2.start();
map2.put("test","foo");
synchronized(called) {
if (!called.get()) {
called.wait(5000);
}
}
assertTrue(called.get());
called.set(false);
assertTrue(map1.isEmpty()==false);
map2.remove("test");
synchronized(called) {
if (!called.get()) {
called.wait(5000);
}
}
assertTrue(map1.isEmpty());
map1.stop();
map2.stop();
}
public void testExpire() throws Exception{
final AtomicBoolean called1 = new AtomicBoolean();
final AtomicBoolean called2 = new AtomicBoolean();
Group map1 = new Group(connection1,"map1");
map1.setTimeToLive(1000);
map1.addMapChangedListener(new DefaultMapChangedListener() {
public void mapRemove(Member owner, Object key, Object value,boolean expired) {
synchronized(called1) {
called1.set(expired);
called1.notifyAll();
}
}
});
map1.start();
Group map2 = new Group(connection2,"map2");
map2.addMapChangedListener(new DefaultMapChangedListener() {
public void mapRemove(Member owner, Object key, Object value,boolean expired) {
synchronized(called2) {
called2.set(expired);
called2.notifyAll();
}
}
});
map2.start();
map1.put("test", "blob");
synchronized(called1) {
if (!called1.get()) {
called1.wait(5000);
}
}
synchronized(called2) {
if (!called2.get()) {
called2.wait(5000);
}
}
assertTrue(called1.get());
assertTrue(called2.get());
map1.stop();
map2.stop();
}
protected void setUp() throws Exception {
if (broker == null) {
broker = createBroker();
}
ConnectionFactory factory = createConnectionFactory();
connection1 = factory.createConnection();
connection1.start();
connection2 = factory.createConnection();
connection2.start();
super.setUp();
}
protected void tearDown() throws Exception {
super.tearDown();
connection1.close();
connection2.close();
if (broker != null) {
broker.stop();
}
}
protected ActiveMQConnectionFactory createConnectionFactory()throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
return cf;
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
configureBroker(answer);
answer.start();
return answer;
}
protected void configureBroker(BrokerService answer) throws Exception {
answer.setPersistent(false);
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
}