mirror of https://github.com/apache/activemq.git
- Added support for distribution of destinations between clients
- Modify the support for composite destinations git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@411727 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dc2919e8ca
commit
61fcd4cfd1
|
@ -45,6 +45,7 @@ public class JmsClientSupport extends JmsFactorySupport {
|
||||||
protected String sessAckMode = SESSION_AUTO_ACKNOWLEDGE;
|
protected String sessAckMode = SESSION_AUTO_ACKNOWLEDGE;
|
||||||
protected String destName = "TEST.FOO";
|
protected String destName = "TEST.FOO";
|
||||||
protected int destCount = 1;
|
protected int destCount = 1;
|
||||||
|
protected int destIndex = 0;
|
||||||
protected boolean destComposite = false;
|
protected boolean destComposite = false;
|
||||||
|
|
||||||
public ConnectionFactory createConnectionFactory() throws JMSException {
|
public ConnectionFactory createConnectionFactory() throws JMSException {
|
||||||
|
@ -78,14 +79,15 @@ public class JmsClientSupport extends JmsFactorySupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Destination[] createDestination() throws JMSException {
|
public Destination[] createDestination() throws JMSException {
|
||||||
Destination[] dest = new Destination[getDestCount()];
|
|
||||||
for (int i=0; i<getDestCount(); i++) {
|
|
||||||
dest[i] = createDestination(getDestName() + "." + i);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isDestComposite()) {
|
if (isDestComposite()) {
|
||||||
return new Destination[] {createDestination(getDestName() + ".>")};
|
return new Destination[] {createCompositeDestination(getDestName(), getDestCount())};
|
||||||
} else {
|
} else {
|
||||||
|
Destination[] dest = new Destination[getDestCount()];
|
||||||
|
for (int i=0; i<getDestCount(); i++) {
|
||||||
|
dest[i] = createDestination(getDestName() + "." + (getDestIndex() + i));
|
||||||
|
}
|
||||||
|
|
||||||
return dest;
|
return dest;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -100,6 +102,28 @@ public class JmsClientSupport extends JmsFactorySupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Destination createCompositeDestination(String name, int count) throws JMSException {
|
||||||
|
String compDestName = "";
|
||||||
|
String simpleName;
|
||||||
|
|
||||||
|
if (name.startsWith("queue://")) {
|
||||||
|
simpleName = name.substring("queue://".length());
|
||||||
|
} else if (name.startsWith("topic://")) {
|
||||||
|
simpleName = name.substring("topic://".length());
|
||||||
|
} else {
|
||||||
|
simpleName = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
int i;
|
||||||
|
compDestName = name + ".0,"; // First destination
|
||||||
|
for (i=1; i<count-1; i++) {
|
||||||
|
compDestName += (simpleName + "." + i +",");
|
||||||
|
}
|
||||||
|
compDestName += (simpleName + "." + i); // Last destination (minus the comma)
|
||||||
|
|
||||||
|
return createDestination(compDestName);
|
||||||
|
}
|
||||||
|
|
||||||
public String getSpiClass() {
|
public String getSpiClass() {
|
||||||
return spiClass;
|
return spiClass;
|
||||||
}
|
}
|
||||||
|
@ -140,6 +164,14 @@ public class JmsClientSupport extends JmsFactorySupport {
|
||||||
this.destCount = destCount;
|
this.destCount = destCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getDestIndex() {
|
||||||
|
return destIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDestIndex(int destIndex) {
|
||||||
|
this.destIndex = destIndex;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isDestComposite() {
|
public boolean isDestComposite() {
|
||||||
return destComposite;
|
return destComposite;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,12 @@ public abstract class JmsClientSystemSupport {
|
||||||
private static final Log log = LogFactory.getLog(JmsClientSystemSupport.class);
|
private static final Log log = LogFactory.getLog(JmsClientSystemSupport.class);
|
||||||
|
|
||||||
public static final String PREFIX_CONFIG_SYSTEM_TEST = "sysTest.";
|
public static final String PREFIX_CONFIG_SYSTEM_TEST = "sysTest.";
|
||||||
|
public static final String DEST_DISTRO_ALL = "all"; // Each client will send/receive to all destination;
|
||||||
|
public static final String DEST_DISTRO_EQUAL = "equal"; // Equally divide the number of destinations to the number of clients
|
||||||
|
public static final String DEST_DISTRO_DIVIDE = "divide"; // Divide the destination among the clients, even if some have more destination than others
|
||||||
|
|
||||||
|
protected static final String KEY_CLIENT_DEST_COUNT = "client.destCount";
|
||||||
|
protected static final String KEY_CLIENT_DEST_INDEX = "client.destIndex";
|
||||||
|
|
||||||
protected Properties sysTestSettings = new Properties();
|
protected Properties sysTestSettings = new Properties();
|
||||||
protected Properties samplerSettings = new Properties();
|
protected Properties samplerSettings = new Properties();
|
||||||
|
@ -34,17 +40,20 @@ public abstract class JmsClientSystemSupport {
|
||||||
protected PerfMeasurementTool performanceSampler;
|
protected PerfMeasurementTool performanceSampler;
|
||||||
|
|
||||||
protected int numClients = 1;
|
protected int numClients = 1;
|
||||||
|
protected int totalDests = 1;
|
||||||
|
protected String destDistro = DEST_DISTRO_ALL;
|
||||||
|
|
||||||
public void runSystemTest() {
|
public void runSystemTest() {
|
||||||
// Create a new copy of the settings to ensure immutability
|
|
||||||
final Properties clientSettings = getJmsClientSettings();
|
|
||||||
|
|
||||||
// Create performance sampler
|
// Create performance sampler
|
||||||
performanceSampler = new PerfMeasurementTool();
|
performanceSampler = new PerfMeasurementTool();
|
||||||
performanceSampler.setSamplerSettings(samplerSettings);
|
performanceSampler.setSamplerSettings(samplerSettings);
|
||||||
|
|
||||||
clientThreadGroup = new ThreadGroup(getThreadGroupName());
|
clientThreadGroup = new ThreadGroup(getThreadGroupName());
|
||||||
for (int i=0; i<numClients; i++) {
|
for (int i=0; i<getNumClients(); i++) {
|
||||||
|
final Properties clientSettings = new Properties();
|
||||||
|
clientSettings.putAll(getJmsClientSettings());
|
||||||
|
distributeDestinations(getDestDistro(), i, getNumClients(), getTotalDests(), clientSettings);
|
||||||
|
|
||||||
final String clientName = getClientName() + i;
|
final String clientName = getClientName() + i;
|
||||||
Thread t = new Thread(clientThreadGroup, new Runnable() {
|
Thread t = new Thread(clientThreadGroup, new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -58,6 +67,51 @@ public abstract class JmsClientSystemSupport {
|
||||||
performanceSampler.startSampler();
|
performanceSampler.startSampler();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void distributeDestinations(String distroType, int clientIndex, int numClients, int numDests, Properties clientSettings) {
|
||||||
|
if (distroType.equalsIgnoreCase(DEST_DISTRO_ALL)) {
|
||||||
|
clientSettings.setProperty(KEY_CLIENT_DEST_COUNT, String.valueOf(numDests));
|
||||||
|
clientSettings.setProperty(KEY_CLIENT_DEST_INDEX, "0");
|
||||||
|
} else if (distroType.equalsIgnoreCase(DEST_DISTRO_EQUAL)) {
|
||||||
|
int destPerClient = (numDests / numClients);
|
||||||
|
// There are equal or more destinations per client
|
||||||
|
if (destPerClient > 0) {
|
||||||
|
clientSettings.setProperty(KEY_CLIENT_DEST_COUNT, String.valueOf(destPerClient));
|
||||||
|
clientSettings.setProperty(KEY_CLIENT_DEST_INDEX, String.valueOf(destPerClient * clientIndex));
|
||||||
|
|
||||||
|
// If there are more clients than destinations, share destinations per client
|
||||||
|
} else {
|
||||||
|
clientSettings.setProperty(KEY_CLIENT_DEST_COUNT, "1"); // At most one destination per client
|
||||||
|
clientSettings.setProperty(KEY_CLIENT_DEST_INDEX, String.valueOf(clientIndex % numDests));
|
||||||
|
}
|
||||||
|
} else if (distroType.equalsIgnoreCase(DEST_DISTRO_DIVIDE)) {
|
||||||
|
int destPerClient = (numDests / numClients);
|
||||||
|
// There are equal or more destinations per client
|
||||||
|
if (destPerClient > 0) {
|
||||||
|
int remain = numDests % numClients;
|
||||||
|
int nextIndex;
|
||||||
|
if (clientIndex < remain) {
|
||||||
|
destPerClient++;
|
||||||
|
nextIndex = clientIndex * destPerClient;
|
||||||
|
} else {
|
||||||
|
nextIndex = (clientIndex * destPerClient) + remain;
|
||||||
|
}
|
||||||
|
|
||||||
|
clientSettings.setProperty(KEY_CLIENT_DEST_COUNT, String.valueOf(destPerClient));
|
||||||
|
clientSettings.setProperty(KEY_CLIENT_DEST_INDEX, String.valueOf(nextIndex));
|
||||||
|
|
||||||
|
// If there are more clients than destinations, share destinations per client
|
||||||
|
} else {
|
||||||
|
clientSettings.setProperty(KEY_CLIENT_DEST_COUNT, "1"); // At most one destination per client
|
||||||
|
clientSettings.setProperty(KEY_CLIENT_DEST_INDEX, String.valueOf(clientIndex % numDests));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send to all for unknown behavior
|
||||||
|
} else {
|
||||||
|
clientSettings.setProperty(KEY_CLIENT_DEST_COUNT, String.valueOf(numDests));
|
||||||
|
clientSettings.setProperty(KEY_CLIENT_DEST_INDEX, "0");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public abstract void runJmsClient(String clientName, Properties clientSettings);
|
public abstract void runJmsClient(String clientName, Properties clientSettings);
|
||||||
|
|
||||||
public String getClientName() {
|
public String getClientName() {
|
||||||
|
@ -139,4 +193,20 @@ public abstract class JmsClientSystemSupport {
|
||||||
public void setNumClients(int numClients) {
|
public void setNumClients(int numClients) {
|
||||||
this.numClients = numClients;
|
this.numClients = numClients;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getDestDistro() {
|
||||||
|
return destDistro;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDestDistro(String destDistro) {
|
||||||
|
this.destDistro = destDistro;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getTotalDests() {
|
||||||
|
return totalDests;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTotalDests(int totalDests) {
|
||||||
|
this.totalDests = totalDests;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class JmsConsumerSystem extends JmsClientSystemSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws JMSException {
|
public static void main(String[] args) throws JMSException {
|
||||||
String[] options = new String[22];
|
String[] options = new String[24];
|
||||||
options[0] = "-Dsampler.duration=60000"; // 1 min
|
options[0] = "-Dsampler.duration=60000"; // 1 min
|
||||||
options[1] = "-Dsampler.interval=5000"; // 5 secs
|
options[1] = "-Dsampler.interval=5000"; // 5 secs
|
||||||
options[2] = "-Dsampler.rampUpTime=10000"; // 10 secs
|
options[2] = "-Dsampler.rampUpTime=10000"; // 10 secs
|
||||||
|
@ -79,6 +79,8 @@ public class JmsConsumerSystem extends JmsClientSystemSupport {
|
||||||
options[20] = "-Dfactory.useRetroactive=false";
|
options[20] = "-Dfactory.useRetroactive=false";
|
||||||
|
|
||||||
options[21] = "-DsysTest.numClients=5";
|
options[21] = "-DsysTest.numClients=5";
|
||||||
|
options[22] = "-DsysTest.totalDests=5";
|
||||||
|
options[23] = "-DsysTest.destDistro=all";
|
||||||
|
|
||||||
args = options;
|
args = options;
|
||||||
|
|
||||||
|
|
|
@ -51,8 +51,8 @@ public class JmsProducerSystem extends JmsClientSystemSupport {
|
||||||
return "JMS Producer Thread Group";
|
return "JMS Producer Thread Group";
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws JMSException {
|
public static void main(String[] args) {
|
||||||
String[] options = new String[17];
|
String[] options = new String[19];
|
||||||
options[0] = "-Dsampler.duration=60000"; // 1 min
|
options[0] = "-Dsampler.duration=60000"; // 1 min
|
||||||
options[1] = "-Dsampler.interval=5000"; // 5 secs
|
options[1] = "-Dsampler.interval=5000"; // 5 secs
|
||||||
options[2] = "-Dsampler.rampUpTime=10000"; // 10 secs
|
options[2] = "-Dsampler.rampUpTime=10000"; // 10 secs
|
||||||
|
@ -74,6 +74,8 @@ public class JmsProducerSystem extends JmsClientSystemSupport {
|
||||||
options[15] = "-Dfactory.asyncSend=true";
|
options[15] = "-Dfactory.asyncSend=true";
|
||||||
|
|
||||||
options[16] = "-DsysTest.numClients=5";
|
options[16] = "-DsysTest.numClients=5";
|
||||||
|
options[17] = "-DsysTest.totalDests=5";
|
||||||
|
options[18] = "-DsysTest.destDistro=all";
|
||||||
|
|
||||||
args = options;
|
args = options;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue