From 0bd99dfff7c297953f03fa9b06f27305fc1a1341 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 4 Apr 2018 17:15:38 -0400 Subject: [PATCH 1/2] ARTEMIS-1779 Fixing CoverityScan finding There was a logic to validate if member is null. Which seemed a bit weird considering the else would throw a NPE. Fixing it proactively based on Coverity-scan findings. --- .../core/server/cluster/impl/BridgeImpl.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 5e0eb17584..e40bc467f9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -1014,17 +1014,18 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled // To be called by the topology update // This logic will be updated on the cluster connection protected void nodeUP(TopologyMember member, boolean last) { - ClientSessionInternal sessionToUse = session; - RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection() : null; - - if (member != null && this.targetNodeID != null && this.targetNodeID.equals(member.getNodeId())) { - // this could be an update of the topology say after a backup started - BridgeImpl.this.targetNode = member; - } else { - // we don't need synchronization here, but we need to make sure we won't get a NPE on races - if (connectionToUse != null && member.isMember(connectionToUse)) { - this.targetNode = member; - this.targetNodeID = member.getNodeId(); + if (member != null) { + ClientSessionInternal sessionToUse = session; + RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection() : null; + if (this.targetNodeID != null && this.targetNodeID.equals(member.getNodeId())) { + // this could be an update of the topology say after a backup started + BridgeImpl.this.targetNode = member; + } else { + // we don't need synchronization here, but we need to make sure we won't get a NPE on races + if (connectionToUse != null && member.isMember(connectionToUse)) { + this.targetNode = member; + this.targetNodeID = member.getNodeId(); + } } } From dae1b7de1cf411994b550715c2fce143917df6f6 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 4 Apr 2018 15:19:02 -0400 Subject: [PATCH 2/2] ARTEMIS-1785 Producer CLI would throw NPEs if using Text Messages --- .../cli/commands/messages/ConsumerThread.java | 23 +++++-- .../cli/commands/messages/Producer.java | 7 +- .../cli/commands/messages/ProducerThread.java | 35 +++++++++- .../cli/commands/{util => messages}/demo.txt | 0 .../apache/activemq/cli/test/ArtemisTest.java | 4 ++ .../cli/test/StreamClassPathTest.java | 69 ++++++++++--------- 6 files changed, 95 insertions(+), 43 deletions(-) rename artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/{util => messages}/demo.txt (100%) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java index ecffa349f1..ab3640bd62 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java @@ -21,6 +21,7 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; @@ -157,11 +158,23 @@ public class ConsumerThread extends Thread { System.out.println("Received " + count); } } - if (bytesAsText && (msg instanceof BytesMessage)) { - long length = ((BytesMessage) msg).getBodyLength(); - byte[] bytes = new byte[(int) length]; - ((BytesMessage) msg).readBytes(bytes); - System.out.println("Message:" + msg); + if (verbose) { + if (bytesAsText && (msg instanceof BytesMessage)) { + long length = ((BytesMessage) msg).getBodyLength(); + byte[] bytes = new byte[(int) length]; + ((BytesMessage) msg).readBytes(bytes); + System.out.println("Received a message with " + bytes.length); + } + + if (msg instanceof TextMessage) { + String text = ((TextMessage) msg).getText(); + System.out.println("Received text sized at " + text.length()); + } + + if (msg instanceof ObjectMessage) { + Object obj = ((ObjectMessage) msg).getObject(); + System.out.println("Received object " + obj.toString().length()); + } } received++; } else { diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java index e077fb0adb..3cb5effb04 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java @@ -29,6 +29,8 @@ import org.apache.activemq.artemis.cli.commands.ActionContext; @Command(name = "producer", description = "It will send messages to an instance") public class Producer extends DestAbstract { + public static final String DEMO_TEXT = "demo.txt"; + @Option(name = "--non-persistent", description = "It will send messages non persistently") boolean nonpersistent = false; @@ -38,6 +40,9 @@ public class Producer extends DestAbstract { @Option(name = "--text-size", description = "Size of each textMessage (The producer will use text message on this case)") int textMessageSize; + @Option(name = "--object-size", description = "Size of each ObjectMessage (The producer will use object mesasge on this case)") + int objectSize; + @Option(name = "--msgttl", description = "TTL for each message") long msgTTL = 0L; @@ -63,7 +68,7 @@ public class Producer extends DestAbstract { threadsArray[i] = new ProducerThread(session, dest, i); threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent). - setMessageSize(messageSize).setTextMessageSize(textMessageSize). + setMessageSize(messageSize).setTextMessageSize(textMessageSize).setObjectSize(objectSize). setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize). setMessageCount(messageCount); } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java index 9a4c1a715d..6e9fc5c4d6 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java @@ -44,6 +44,7 @@ public class ProducerThread extends Thread { boolean persistent = true; int messageSize = 0; int textMessageSize; + int objectSize; long msgTTL = 0L; String msgGroupID = null; int transactionBatchSize; @@ -150,9 +151,23 @@ public class ProducerThread extends Thread { answer = session.createBytesMessage(); ((BytesMessage) answer).writeBytes(payload); } else { - if (textMessageSize > 0) { + if (textMessageSize > 0 || objectSize > 0) { + + if (objectSize > 0) { + textMessageSize = objectSize; + } if (messageText == null) { - messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i); + String read = readInputStream(getClass().getResourceAsStream(Producer.DEMO_TEXT), textMessageSize, i); + if (read.length() == textMessageSize) { + messageText = read; + } else { + StringBuffer buffer = new StringBuffer(read); + while (buffer.length() < textMessageSize) { + buffer.append(read); + } + messageText = buffer.toString(); + } + } } else if (payloadUrl != null) { messageText = readInputStream(new URL(payloadUrl).openStream(), -1, i); @@ -161,7 +176,12 @@ public class ProducerThread extends Thread { } else { messageText = createDefaultMessage(i); } - answer = session.createTextMessage(messageText); + + if (objectSize > 0) { + answer = session.createObjectMessage(messageText); + } else { + answer = session.createTextMessage(messageText); + } } if ((msgGroupID != null) && (!msgGroupID.isEmpty())) { answer.setStringProperty("JMSXGroupID", msgGroupID); @@ -341,4 +361,13 @@ public class ProducerThread extends Thread { this.verbose = verbose; return this; } + + public int getObjectSize() { + return objectSize; + } + + public ProducerThread setObjectSize(int objectSize) { + this.objectSize = objectSize; + return this; + } } diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/util/demo.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/messages/demo.txt similarity index 100% rename from artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/util/demo.txt rename to artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/messages/demo.txt diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java index 2be6d7ff71..526f7dff85 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java @@ -583,6 +583,10 @@ public class ArtemisTest extends CliTestBase { assertEquals(Integer.valueOf(100), Artemis.internalExecute("producer", "--message-count", "100", "--user", "admin", "--password", "admin")); assertEquals(Integer.valueOf(100), Artemis.internalExecute("consumer", "--break-on-null", "--receive-timeout", "100", "--user", "admin", "--password", "admin")); + assertEquals(Integer.valueOf(10), Artemis.internalExecute("producer", "--text-size", "500", "--message-count", "10", "--user", "admin", "--password", "admin")); + assertEquals(Integer.valueOf(10), Artemis.internalExecute("consumer", "--break-on-null", "--receive-timeout", "100", "--user", "admin", "--password", "admin")); + assertEquals(Integer.valueOf(10), Artemis.internalExecute("producer", "--message-size", "500", "--message-count", "10", "--user", "admin", "--password", "admin")); + assertEquals(Integer.valueOf(10), Artemis.internalExecute("consumer", "--break-on-null", "--receive-timeout", "100", "--user", "admin", "--password", "admin")); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection("admin", "admin"); diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java index a11e0949c6..c348476951 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java @@ -20,6 +20,7 @@ package org.apache.activemq.cli.test; import java.io.InputStream; import org.apache.activemq.artemis.cli.commands.Create; +import org.apache.activemq.artemis.cli.commands.messages.Producer; import org.junit.Assert; import org.junit.Test; @@ -30,43 +31,43 @@ public class StreamClassPathTest { */ @Test public void testFindStreams() throws Exception { - openStream(Create.BIN_ARTEMIS_CMD); - openStream(Create.BIN_ARTEMIS_SERVICE_EXE); - openStream(Create.BIN_ARTEMIS_SERVICE_XML); - openStream("etc/" + Create.ETC_ARTEMIS_PROFILE_CMD); - openStream(Create.BIN_ARTEMIS); - openStream(Create.BIN_ARTEMIS_SERVICE); - openStream("etc/" + Create.ETC_ARTEMIS_PROFILE); - openStream("etc/" + Create.ETC_LOGGING_PROPERTIES); - openStream("etc/" + Create.ETC_BOOTSTRAP_XML); - openStream("etc/" + Create.ETC_MANAGEMENT_XML); - openStream("etc/" + Create.ETC_BROKER_XML); - openStream("etc/" + Create.ETC_ARTEMIS_ROLES_PROPERTIES); - openStream("etc/" + Create.ETC_ARTEMIS_USERS_PROPERTIES); - openStream(Create.ETC_REPLICATED_SETTINGS_TXT); - openStream(Create.ETC_REPLICATED_SETTINGS_TXT); - openStream(Create.ETC_SHARED_STORE_SETTINGS_TXT); - openStream(Create.ETC_CLUSTER_SECURITY_SETTINGS_TXT); - openStream(Create.ETC_CLUSTER_SETTINGS_TXT); - openStream(Create.ETC_CONNECTOR_SETTINGS_TXT); - openStream(Create.ETC_BOOTSTRAP_WEB_SETTINGS_TXT); - openStream(Create.ETC_JOURNAL_BUFFER_SETTINGS); - openStream(Create.ETC_AMQP_ACCEPTOR_TXT); - openStream(Create.ETC_MQTT_ACCEPTOR_TXT); - openStream(Create.ETC_HORNETQ_ACCEPTOR_TXT); - openStream(Create.ETC_STOMP_ACCEPTOR_TXT); - openStream(Create.ETC_PING_TXT); - openStream(Create.ETC_COMMENTED_PING_TXT); - openStream(Create.ETC_GLOBAL_MAX_SPECIFIED_TXT); - openStream(Create.ETC_GLOBAL_MAX_DEFAULT_TXT); - openStream("etc/" + Create.ETC_JOLOKIA_ACCESS_XML); - openStream(Create.ETC_DATABASE_STORE_TXT); + testStream(Create.class, Create.BIN_ARTEMIS_CMD); + testStream(Create.class, Create.BIN_ARTEMIS_SERVICE_EXE); + testStream(Create.class, Create.BIN_ARTEMIS_SERVICE_XML); + testStream(Create.class, "etc/" + Create.ETC_ARTEMIS_PROFILE_CMD); + testStream(Create.class, Create.BIN_ARTEMIS); + testStream(Create.class, Create.BIN_ARTEMIS_SERVICE); + testStream(Create.class, "etc/" + Create.ETC_ARTEMIS_PROFILE); + testStream(Create.class, "etc/" + Create.ETC_LOGGING_PROPERTIES); + testStream(Create.class, "etc/" + Create.ETC_BOOTSTRAP_XML); + testStream(Create.class, "etc/" + Create.ETC_MANAGEMENT_XML); + testStream(Create.class, "etc/" + Create.ETC_BROKER_XML); + testStream(Create.class, "etc/" + Create.ETC_ARTEMIS_ROLES_PROPERTIES); + testStream(Create.class, "etc/" + Create.ETC_ARTEMIS_USERS_PROPERTIES); + testStream(Create.class, Create.ETC_REPLICATED_SETTINGS_TXT); + testStream(Create.class, Create.ETC_REPLICATED_SETTINGS_TXT); + testStream(Create.class, Create.ETC_SHARED_STORE_SETTINGS_TXT); + testStream(Create.class, Create.ETC_CLUSTER_SECURITY_SETTINGS_TXT); + testStream(Create.class, Create.ETC_CLUSTER_SETTINGS_TXT); + testStream(Create.class, Create.ETC_CONNECTOR_SETTINGS_TXT); + testStream(Create.class, Create.ETC_BOOTSTRAP_WEB_SETTINGS_TXT); + testStream(Create.class, Create.ETC_JOURNAL_BUFFER_SETTINGS); + testStream(Create.class, Create.ETC_AMQP_ACCEPTOR_TXT); + testStream(Create.class, Create.ETC_MQTT_ACCEPTOR_TXT); + testStream(Create.class, Create.ETC_HORNETQ_ACCEPTOR_TXT); + testStream(Create.class, Create.ETC_STOMP_ACCEPTOR_TXT); + testStream(Create.class, Create.ETC_PING_TXT); + testStream(Create.class, Create.ETC_COMMENTED_PING_TXT); + testStream(Create.class, Create.ETC_GLOBAL_MAX_SPECIFIED_TXT); + testStream(Create.class, Create.ETC_GLOBAL_MAX_DEFAULT_TXT); + testStream(Create.class, "etc/" + Create.ETC_JOLOKIA_ACCESS_XML); + testStream(Create.class, Create.ETC_DATABASE_STORE_TXT); + testStream(Producer.class, Producer.DEMO_TEXT); } - private void openStream(String source) throws Exception { - Create create = new Create(); - InputStream in = create.openStream(source); + private void testStream(Class clazz, String source) throws Exception { + InputStream in = clazz.getResourceAsStream(source); Assert.assertNotNull(source + " not found", in); in.close(); }