This closes #529
This commit is contained in:
commit
ea5fc16c68
|
@ -85,6 +85,10 @@ public class Create extends InputAbstract {
|
||||||
public static final String ETC_CONNECTOR_SETTINGS_TXT = "etc/connector-settings.txt";
|
public static final String ETC_CONNECTOR_SETTINGS_TXT = "etc/connector-settings.txt";
|
||||||
public static final String ETC_BOOTSTRAP_WEB_SETTINGS_TXT = "etc/bootstrap-web-settings.txt";
|
public static final String ETC_BOOTSTRAP_WEB_SETTINGS_TXT = "etc/bootstrap-web-settings.txt";
|
||||||
public static final String ETC_JOURNAL_BUFFER_SETTINGS = "etc/journal-buffer-settings.txt";
|
public static final String ETC_JOURNAL_BUFFER_SETTINGS = "etc/journal-buffer-settings.txt";
|
||||||
|
public static final String ETC_AMQP_ACCEPTOR_TXT = "etc/amqp-acceptor.txt";
|
||||||
|
public static final String ETC_HORNETQ_ACCEPTOR_TXT = "etc/hornetq-acceptor.txt";
|
||||||
|
public static final String ETC_MQTT_ACCEPTOR_TXT = "etc/mqtt-acceptor.txt";
|
||||||
|
public static final String ETC_STOMP_ACCEPTOR_TXT = "etc/stomp-acceptor.txt";
|
||||||
|
|
||||||
@Arguments(description = "The instance directory to hold the broker's configuration and data. Path must be writable.", required = true)
|
@Arguments(description = "The instance directory to hold the broker's configuration and data. Path must be writable.", required = true)
|
||||||
File directory;
|
File directory;
|
||||||
|
@ -92,10 +96,13 @@ public class Create extends InputAbstract {
|
||||||
@Option(name = "--host", description = "The host name of the broker (Default: 0.0.0.0 or input if clustered)")
|
@Option(name = "--host", description = "The host name of the broker (Default: 0.0.0.0 or input if clustered)")
|
||||||
String host;
|
String host;
|
||||||
|
|
||||||
|
@Option(name = "--default-port", description = "The port number to use for the main 'artemis' acceptor (Default: 61616)")
|
||||||
|
int defaultPort = DEFAULT_PORT;
|
||||||
|
|
||||||
@Option(name = "--name", description = "The name of the broker (Default: same as host)")
|
@Option(name = "--name", description = "The name of the broker (Default: same as host)")
|
||||||
String name;
|
String name;
|
||||||
|
|
||||||
@Option(name = "--port-offset", description = "Off sets the default ports")
|
@Option(name = "--port-offset", description = "Off sets the ports of every acceptor")
|
||||||
int portOffset;
|
int portOffset;
|
||||||
|
|
||||||
@Option(name = "--force", description = "Overwrite configuration at destination directory")
|
@Option(name = "--force", description = "Overwrite configuration at destination directory")
|
||||||
|
@ -176,6 +183,18 @@ public class Create extends InputAbstract {
|
||||||
@Option(name = "--disable-persistence", description = "Disable message persistence to the journal")
|
@Option(name = "--disable-persistence", description = "Disable message persistence to the journal")
|
||||||
boolean disablePersistence;
|
boolean disablePersistence;
|
||||||
|
|
||||||
|
@Option(name = "--no-amqp-acceptor", description = "Disable the AMQP specific acceptor.")
|
||||||
|
boolean noAmqpAcceptor;
|
||||||
|
|
||||||
|
@Option(name = "--no-mqtt-acceptor", description = "Disable the MQTT specific acceptor.")
|
||||||
|
boolean noMqttAcceptor;
|
||||||
|
|
||||||
|
@Option(name = "--no-stomp-acceptor", description = "Disable the STOMP specific acceptor.")
|
||||||
|
boolean noStompAcceptor;
|
||||||
|
|
||||||
|
@Option(name = "--no-hornetq-acceptor", description = "Disable the HornetQ specific acceptor.")
|
||||||
|
boolean noHornetQAcceptor;
|
||||||
|
|
||||||
boolean IS_WINDOWS;
|
boolean IS_WINDOWS;
|
||||||
|
|
||||||
boolean IS_CYGWIN;
|
boolean IS_CYGWIN;
|
||||||
|
@ -500,7 +519,7 @@ public class Create extends InputAbstract {
|
||||||
}
|
}
|
||||||
|
|
||||||
filters.put("${user}", System.getProperty("user.name", ""));
|
filters.put("${user}", System.getProperty("user.name", ""));
|
||||||
filters.put("${default.port}", String.valueOf(DEFAULT_PORT + portOffset));
|
filters.put("${default.port}", String.valueOf(defaultPort + portOffset));
|
||||||
filters.put("${amqp.port}", String.valueOf(AMQP_PORT + portOffset));
|
filters.put("${amqp.port}", String.valueOf(AMQP_PORT + portOffset));
|
||||||
filters.put("${stomp.port}", String.valueOf(STOMP_PORT + portOffset));
|
filters.put("${stomp.port}", String.valueOf(STOMP_PORT + portOffset));
|
||||||
filters.put("${hq.port}", String.valueOf(HQ_PORT + portOffset));
|
filters.put("${hq.port}", String.valueOf(HQ_PORT + portOffset));
|
||||||
|
@ -602,6 +621,34 @@ public class Create extends InputAbstract {
|
||||||
filters.put("${bootstrap-web-settings}", applyFilters(readTextFile(ETC_BOOTSTRAP_WEB_SETTINGS_TXT), filters));
|
filters.put("${bootstrap-web-settings}", applyFilters(readTextFile(ETC_BOOTSTRAP_WEB_SETTINGS_TXT), filters));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (noAmqpAcceptor) {
|
||||||
|
filters.put("${amqp-acceptor}", "");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
filters.put("${amqp-acceptor}", applyFilters(readTextFile(ETC_AMQP_ACCEPTOR_TXT), filters));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (noMqttAcceptor) {
|
||||||
|
filters.put("${mqtt-acceptor}", "");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
filters.put("${mqtt-acceptor}", applyFilters(readTextFile(ETC_MQTT_ACCEPTOR_TXT), filters));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (noStompAcceptor) {
|
||||||
|
filters.put("${stomp-acceptor}", "");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
filters.put("${stomp-acceptor}", applyFilters(readTextFile(ETC_STOMP_ACCEPTOR_TXT), filters));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (noHornetQAcceptor) {
|
||||||
|
filters.put("${hornetq-acceptor}", "");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
filters.put("${hornetq-acceptor}", applyFilters(readTextFile(ETC_HORNETQ_ACCEPTOR_TXT), filters));
|
||||||
|
}
|
||||||
|
|
||||||
performAutoTune(filters, aio, dataFolder);
|
performAutoTune(filters, aio, dataFolder);
|
||||||
|
|
||||||
write(ETC_BROKER_XML, filters, false);
|
write(ETC_BROKER_XML, filters, false);
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
|
||||||
|
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||||
|
<acceptor name="amqp">tcp://${host}:${amqp.port}?protocols=AMQP</acceptor>
|
|
@ -54,18 +54,7 @@ ${connector-config.settings}
|
||||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||||
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
||||||
<acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
<acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
||||||
|
${amqp-acceptor}${stomp-acceptor}${hornetq-acceptor}${mqtt-acceptor}
|
||||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
|
||||||
<acceptor name="amqp">tcp://${host}:${amqp.port}?protocols=AMQP</acceptor>
|
|
||||||
|
|
||||||
<!-- STOMP Acceptor. -->
|
|
||||||
<acceptor name="stomp">tcp://${host}:${stomp.port}?protocols=STOMP</acceptor>
|
|
||||||
|
|
||||||
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
|
||||||
<acceptor name="hornetq">tcp://${host}:${hq.port}?protocols=HORNETQ,STOMP</acceptor>
|
|
||||||
|
|
||||||
<!-- MQTT Acceptor -->
|
|
||||||
<acceptor name="mqtt">tcp://${host}:${mqtt.port}?protocols=MQTT</acceptor>
|
|
||||||
</acceptors>
|
</acceptors>
|
||||||
|
|
||||||
${cluster-security.settings}${cluster.settings}${replicated.settings}${shared-store.settings}
|
${cluster-security.settings}${cluster.settings}${replicated.settings}${shared-store.settings}
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
|
||||||
|
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||||
|
<acceptor name="hornetq">tcp://${host}:${hq.port}?protocols=HORNETQ,STOMP</acceptor>
|
|
@ -0,0 +1,3 @@
|
||||||
|
|
||||||
|
<!-- MQTT Acceptor -->
|
||||||
|
<acceptor name="mqtt">tcp://${host}:${mqtt.port}?protocols=MQTT</acceptor>
|
|
@ -0,0 +1,3 @@
|
||||||
|
|
||||||
|
<!-- STOMP Acceptor. -->
|
||||||
|
<acceptor name="stomp">tcp://${host}:${stomp.port}?protocols=STOMP</acceptor>
|
|
@ -50,6 +50,10 @@ public class StreamClassPathTest {
|
||||||
openStream(Create.ETC_CONNECTOR_SETTINGS_TXT);
|
openStream(Create.ETC_CONNECTOR_SETTINGS_TXT);
|
||||||
openStream(Create.ETC_BOOTSTRAP_WEB_SETTINGS_TXT);
|
openStream(Create.ETC_BOOTSTRAP_WEB_SETTINGS_TXT);
|
||||||
openStream(Create.ETC_JOURNAL_BUFFER_SETTINGS);
|
openStream(Create.ETC_JOURNAL_BUFFER_SETTINGS);
|
||||||
|
openStream(Create.ETC_AMQP_ACCEPTOR_TXT);
|
||||||
|
openStream(Create.ETC_MQTT_ACCEPTOR_TXT);
|
||||||
|
openStream(Create.ETC_HORNETQ_ACCEPTOR_TXT);
|
||||||
|
openStream(Create.ETC_STOMP_ACCEPTOR_TXT);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void openStream(String source) throws Exception {
|
private void openStream(String source) throws Exception {
|
||||||
|
|
|
@ -591,7 +591,7 @@ public abstract class ActiveMQTestBase extends Assert {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} while (i++ <= 30 && hasValue);
|
} while (i++ <= 50 && hasValue);
|
||||||
|
|
||||||
for (WeakReference<?> ref : references) {
|
for (WeakReference<?> ref : references) {
|
||||||
Assert.assertNull(ref.get());
|
Assert.assertNull(ref.get());
|
||||||
|
|
|
@ -332,7 +332,7 @@ public class FailoverTest extends FailoverTestBase {
|
||||||
// https://issues.jboss.org/browse/HORNETQ-685
|
// https://issues.jboss.org/browse/HORNETQ-685
|
||||||
@Test
|
@Test
|
||||||
public void testTimeoutOnFailoverTransactionCommit() throws Exception {
|
public void testTimeoutOnFailoverTransactionCommit() throws Exception {
|
||||||
locator.setCallTimeout(2000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(-1);
|
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(-1);
|
||||||
|
|
||||||
((InVMNodeManager) nodeManager).failoverPause = 5000L;
|
((InVMNodeManager) nodeManager).failoverPause = 5000L;
|
||||||
|
|
||||||
|
|
|
@ -900,7 +900,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
|
|
||||||
ClientProducer producer1 = session.createProducer(random1);
|
ClientProducer producer1 = session.createProducer(random1);
|
||||||
ClientProducer producer2 = session.createProducer(random2);
|
ClientProducer producer2 = session.createProducer(random2);
|
||||||
ClientMessage message = session.createMessage(false);
|
ClientMessage message = session.createMessage(true);
|
||||||
producer1.send(message);
|
producer1.send(message);
|
||||||
producer2.send(message);
|
producer2.send(message);
|
||||||
|
|
||||||
|
|
|
@ -347,12 +347,12 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
|
||||||
Assert.assertEquals(TEST_SIZE * 2, messageCount);
|
Assert.assertEquals(TEST_SIZE * 2, messageCount);
|
||||||
|
|
||||||
for (int i = 0; i < TEST_SIZE; i++) {
|
for (int i = 0; i < TEST_SIZE; i++) {
|
||||||
ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
|
ClientMessage clientMessage = consumers[0].getConsumer().receive(1000);
|
||||||
Assert.assertNotNull(clientMessage);
|
Assert.assertNotNull(clientMessage);
|
||||||
IntegrationTestLogger.LOGGER.info("Received: " + clientMessage);
|
IntegrationTestLogger.LOGGER.info("Received: " + clientMessage);
|
||||||
clientMessage.acknowledge();
|
clientMessage.acknowledge();
|
||||||
|
|
||||||
clientMessage = consumers[1].getConsumer().receive(250);
|
clientMessage = consumers[1].getConsumer().receive(1000);
|
||||||
Assert.assertNotNull(clientMessage);
|
Assert.assertNotNull(clientMessage);
|
||||||
IntegrationTestLogger.LOGGER.info("Received: " + clientMessage);
|
IntegrationTestLogger.LOGGER.info("Received: " + clientMessage);
|
||||||
clientMessage.acknowledge();
|
clientMessage.acknowledge();
|
||||||
|
|
Loading…
Reference in New Issue