This closes #813

This commit is contained in:
Clebert Suconic 2016-09-30 13:19:08 -04:00
commit 8cd677b206
7 changed files with 61 additions and 61 deletions

View File

@ -326,17 +326,17 @@ public final class XmlDataImporter extends ActionAbstract {
// Get the ID of the queues involved so the message can be routed properly. This is done because we cannot
// send directly to a queue, we have to send to an address instead but not all the queues related to the
// address may need the message
ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.activemq.management");
ClientMessage managementMessage = managementSession.createMessage(false);
ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
managementSession.start();
if (logger.isDebugEnabled()) {
logger.debug("Requesting ID for: " + queue);
try (ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.activemq.management")) {
ClientMessage managementMessage = managementSession.createMessage(false);
ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
managementSession.start();
if (logger.isDebugEnabled()) {
logger.debug("Requesting ID for: " + queue);
}
ClientMessage reply = requestor.request(managementMessage);
Number idObject = (Number) ManagementHelper.getResult(reply);
queueID = idObject.longValue();
}
ClientMessage reply = requestor.request(managementMessage);
Number idObject = (Number) ManagementHelper.getResult(reply);
queueID = idObject.longValue();
requestor.close();
if (logger.isDebugEnabled()) {
logger.debug("ID for " + queue + " is: " + queueID);
}
@ -825,21 +825,20 @@ public final class XmlDataImporter extends ActionAbstract {
reader.next();
}
ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.activemq.management");
ClientMessage managementMessage = managementSession.createMessage(false);
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.JMS_SERVER, "createConnectionFactory", name, Boolean.parseBoolean(ha), discoveryGroupName.length() > 0, Integer.parseInt(type), connectors, entries, clientId, Long.parseLong(clientFailureCheckPeriod), Long.parseLong(connectionTtl), Long.parseLong(callTimeout), Long.parseLong(callFailoverTimeout), Integer.parseInt(minLargeMessageSize), Boolean.parseBoolean(compressLargeMessages), Integer.parseInt(consumerWindowSize), Integer.parseInt(consumerMaxRate), Integer.parseInt(confirmationWindowSize), Integer.parseInt(producerWindowSize), Integer.parseInt(producerMaxRate), Boolean.parseBoolean(blockOnAcknowledge), Boolean.parseBoolean(blockOnDurableSend), Boolean.parseBoolean(blockOnNonDurableSend), Boolean.parseBoolean(autoGroup), Boolean.parseBoolean(preacknowledge), loadBalancingPolicyClassName, Integer.parseInt(transactionBatchSize), Integer.parseInt(dupsOkBatchSize), Boolean.parseBoolean(useGlobalPools), Integer.parseInt(scheduledThreadMaxPoolSize), Integer.parseInt(threadMaxPoolSize), Long.parseLong(retryInterval), Double.parseDouble(retryIntervalMultiplier), Long.parseLong(maxRetryInterval), Integer.parseInt(reconnectAttempts), Boolean.parseBoolean(failoverOnInitialConnection), groupId);
//Boolean.parseBoolean(cacheLargeMessagesClient));
managementSession.start();
ClientMessage reply = requestor.request(managementMessage);
if (ManagementHelper.hasOperationSucceeded(reply)) {
if (logger.isDebugEnabled()) {
logger.debug("Created connection factory " + name);
try (ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.activemq.management")) {
ClientMessage managementMessage = managementSession.createMessage(false);
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.JMS_SERVER, "createConnectionFactory", name, Boolean.parseBoolean(ha), discoveryGroupName.length() > 0, Integer.parseInt(type), connectors, entries, clientId, Long.parseLong(clientFailureCheckPeriod), Long.parseLong(connectionTtl), Long.parseLong(callTimeout), Long.parseLong(callFailoverTimeout), Integer.parseInt(minLargeMessageSize), Boolean.parseBoolean(compressLargeMessages), Integer.parseInt(consumerWindowSize), Integer.parseInt(consumerMaxRate), Integer.parseInt(confirmationWindowSize), Integer.parseInt(producerWindowSize), Integer.parseInt(producerMaxRate), Boolean.parseBoolean(blockOnAcknowledge), Boolean.parseBoolean(blockOnDurableSend), Boolean.parseBoolean(blockOnNonDurableSend), Boolean.parseBoolean(autoGroup), Boolean.parseBoolean(preacknowledge), loadBalancingPolicyClassName, Integer.parseInt(transactionBatchSize), Integer.parseInt(dupsOkBatchSize), Boolean.parseBoolean(useGlobalPools), Integer.parseInt(scheduledThreadMaxPoolSize), Integer.parseInt(threadMaxPoolSize), Long.parseLong(retryInterval), Double.parseDouble(retryIntervalMultiplier), Long.parseLong(maxRetryInterval), Integer.parseInt(reconnectAttempts), Boolean.parseBoolean(failoverOnInitialConnection), groupId);
//Boolean.parseBoolean(cacheLargeMessagesClient));
managementSession.start();
ClientMessage reply = requestor.request(managementMessage);
if (ManagementHelper.hasOperationSucceeded(reply)) {
if (logger.isDebugEnabled()) {
logger.debug("Created connection factory " + name);
}
} else {
ActiveMQServerLogger.LOGGER.error("Problem creating " + name);
}
} else {
ActiveMQServerLogger.LOGGER.error("Problem creating " + name);
}
requestor.close();
}
private void createJmsDestination() throws Exception {
@ -884,24 +883,23 @@ public final class XmlDataImporter extends ActionAbstract {
reader.next();
}
ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.activemq.management");
ClientMessage managementMessage = managementSession.createMessage(false);
if ("Queue".equals(type)) {
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.JMS_SERVER, "createQueue", name, entries, selector);
} else if ("Topic".equals(type)) {
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.JMS_SERVER, "createTopic", name, entries);
}
managementSession.start();
ClientMessage reply = requestor.request(managementMessage);
if (ManagementHelper.hasOperationSucceeded(reply)) {
if (logger.isDebugEnabled()) {
logger.debug("Created " + type.toLowerCase() + " " + name);
try (ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.activemq.management")) {
ClientMessage managementMessage = managementSession.createMessage(false);
if ("Queue".equals(type)) {
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.JMS_SERVER, "createQueue", name, entries, selector);
} else if ("Topic".equals(type)) {
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.JMS_SERVER, "createTopic", name, entries);
}
managementSession.start();
ClientMessage reply = requestor.request(managementMessage);
if (ManagementHelper.hasOperationSucceeded(reply)) {
if (logger.isDebugEnabled()) {
logger.debug("Created " + type.toLowerCase() + " " + name);
}
} else {
ActiveMQServerLogger.LOGGER.error("Problem creating " + name);
}
} else {
ActiveMQServerLogger.LOGGER.error("Problem creating " + name);
}
requestor.close();
}
private String getEntries() throws Exception {

View File

@ -144,9 +144,10 @@ public final class JsonUtil {
CompositeData[] cds = new CompositeData[data.length];
for (int i1 = 0; i1 < data.length; i1++) {
String dataConverted = convertJsonValue(data[i1], String.class).toString();
ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(Base64.decode(dataConverted)));
ois.setWhiteList("java.util,java.lang,javax.management");
cds[i1] = (CompositeDataSupport) ois.readObject();
try (ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(Base64.decode(dataConverted)))) {
ois.setWhiteList("java.util,java.lang,javax.management");
cds[i1] = (CompositeDataSupport) ois.readObject();
}
}
innerVal = cds;
}

View File

@ -73,9 +73,10 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMe
int size = getInnerMessage().getBodyBuffer().readInt();
byte[] bytes = new byte[size];
getInnerMessage().getBodyBuffer().readBytes(bytes);
ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bytes));
ois.setWhiteList(DEFAULT_WHITELIST);
ois.setBlackList(DEFAULT_BLACKLIST);
object = (Serializable) ois.readObject();
try (ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bytes))) {
ois.setWhiteList(DEFAULT_WHITELIST);
ois.setBlackList(DEFAULT_BLACKLIST);
object = (Serializable) ois.readObject();
}
}
}

View File

@ -45,8 +45,7 @@ public class ConsumedObjectMessage extends ConsumedMessage {
byte[] body = new byte[size];
message.getBodyBuffer().readBytes(body);
ByteArrayInputStream bais = new ByteArrayInputStream(body);
try {
ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(bais);
try (ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(bais)) {
if (options != null) {
ois.setWhiteList(options.getDeserializationWhiteList());
ois.setBlackList(options.getDeserializationBlackList());

View File

@ -74,8 +74,7 @@ public class HttpMessageHelper {
message.getBodyBuffer().readBytes(body);
ByteArrayInputStream bais = new ByteArrayInputStream(body);
Object obj = null;
try {
ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(bais);
try (ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(bais)) {
if (jmsOptions != null) {
ois.setBlackList(jmsOptions.getDeserializationBlackList());
ois.setWhiteList(jmsOptions.getDeserializationWhiteList());

View File

@ -438,17 +438,18 @@ public class ScaleDownHandler {
private Integer getQueueID(ClientSession session, SimpleString queueName) throws Exception {
Integer queueID = -1;
ClientRequestor requestor = new ClientRequestor(session, "jms.queue.activemq.management");
ClientMessage managementMessage = session.createMessage(false);
ManagementHelper.putAttribute(managementMessage, "core.queue." + queueName, "ID");
session.start();
logger.debug("Requesting ID for: " + queueName);
ClientMessage reply = requestor.request(managementMessage);
Object result = ManagementHelper.getResult(reply);
Object result;
try (ClientRequestor requestor = new ClientRequestor(session, "jms.queue.activemq.management")) {
ClientMessage managementMessage = session.createMessage(false);
ManagementHelper.putAttribute(managementMessage, "core.queue." + queueName, "ID");
session.start();
logger.debug("Requesting ID for: " + queueName);
ClientMessage reply = requestor.request(managementMessage);
result = ManagementHelper.getResult(reply);
}
if (result != null && result instanceof Number) {
queueID = ((Number) result).intValue();
}
requestor.close();
return queueID;
}

View File

@ -73,9 +73,10 @@ public class InMemorySchemaPartition extends AbstractLdifPartition {
for (String resourcePath : new TreeSet<>(resMap.keySet())) {
if (resourcePath.endsWith(".ldif")) {
URL resource = DefaultSchemaLdifExtractor.getUniqueResource(resourcePath, "Schema LDIF file");
LdifReader reader = new LdifReader(resource.openStream());
LdifEntry ldifEntry = reader.next();
reader.close();
LdifEntry ldifEntry;
try (LdifReader reader = new LdifReader(resource.openStream())) {
ldifEntry = reader.next();
}
Entry entry = new DefaultEntry(schemaManager, ldifEntry.getEntry());
// add mandatory attributes