This commit is contained in:
Michael Andre Pearce 2019-01-17 22:41:56 +00:00
commit ba53bed20d
40 changed files with 177 additions and 172 deletions

View File

@ -201,7 +201,7 @@ public class ConsumerThread extends Thread {
System.out.println(threadName + " Committing transaction: " + transactions++);
session.commit();
}
} else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
} else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE && msg != null) {
if (batchSize > 0 && received > 0 && received % batchSize == 0) {
System.out.println("Acknowledging last " + batchSize + " messages; messages so far = " + received);
msg.acknowledge();

View File

@ -216,63 +216,63 @@ public class PrintData extends DBOption {
if (pgStore != null) {
folder = pgStore.getFolder();
}
out.println("####################################################################################################");
out.println("Exploring store " + store + " folder = " + folder);
int pgid = (int) pgStore.getFirstPage();
for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++) {
out.println("******* Page " + pgid);
Page page = pgStore.createPage(pgid);
page.open();
List<PagedMessage> msgs = page.read(sm);
page.close();
out.println("####################################################################################################");
out.println("Exploring store " + store + " folder = " + folder);
int pgid = (int) pgStore.getFirstPage();
for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++) {
out.println("******* Page " + pgid);
Page page = pgStore.createPage(pgid);
page.open();
List<PagedMessage> msgs = page.read(sm);
page.close();
int msgID = 0;
int msgID = 0;
for (PagedMessage msg : msgs) {
msg.initMessage(sm);
if (safe) {
try {
out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ", msg=" + msg.getMessage().getClass().getSimpleName() + "(safe data, size=" + msg.getMessage().getPersistentSize() + ")");
} catch (Exception e) {
out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ", msg=" + msg.getMessage().getClass().getSimpleName() + "(safe data)");
for (PagedMessage msg : msgs) {
msg.initMessage(sm);
if (safe) {
try {
out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ", msg=" + msg.getMessage().getClass().getSimpleName() + "(safe data, size=" + msg.getMessage().getPersistentSize() + ")");
} catch (Exception e) {
out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ", msg=" + msg.getMessage().getClass().getSimpleName() + "(safe data)");
}
} else {
out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ",userMessageID=" + (msg.getMessage().getUserID() != null ? msg.getMessage().getUserID() : "") + ", msg=" + msg.getMessage());
}
} else {
out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ",userMessageID=" + (msg.getMessage().getUserID() != null ? msg.getMessage().getUserID() : "") + ", msg=" + msg.getMessage());
out.print(",Queues = ");
long[] q = msg.getQueueIDs();
for (int i = 0; i < q.length; i++) {
out.print(q[i]);
PagePosition posCheck = new PagePositionImpl(pgid, msgID);
boolean acked = false;
Set<PagePosition> positions = cursorACKs.getCursorRecords().get(q[i]);
if (positions != null) {
acked = positions.contains(posCheck);
}
if (acked) {
out.print(" (ACK)");
}
if (cursorACKs.getCompletePages(q[i]).contains(Long.valueOf(pgid))) {
out.println(" (PG-COMPLETE)");
}
if (i + 1 < q.length) {
out.print(",");
}
}
if (msg.getTransactionID() >= 0 && !pgTXs.contains(msg.getTransactionID())) {
out.print(", **PG_TX_NOT_FOUND**");
}
out.println();
msgID++;
}
out.print(",Queues = ");
long[] q = msg.getQueueIDs();
for (int i = 0; i < q.length; i++) {
out.print(q[i]);
PagePosition posCheck = new PagePositionImpl(pgid, msgID);
boolean acked = false;
Set<PagePosition> positions = cursorACKs.getCursorRecords().get(q[i]);
if (positions != null) {
acked = positions.contains(posCheck);
}
if (acked) {
out.print(" (ACK)");
}
if (cursorACKs.getCompletePages(q[i]).contains(Long.valueOf(pgid))) {
out.println(" (PG-COMPLETE)");
}
if (i + 1 < q.length) {
out.print(",");
}
}
if (msg.getTransactionID() >= 0 && !pgTXs.contains(msg.getTransactionID())) {
out.print(", **PG_TX_NOT_FOUND**");
}
out.println();
msgID++;
pgid++;
}
pgid++;
}
}
}

View File

@ -75,8 +75,9 @@ public class DecodeJournal extends LockAbstract {
final int minFiles,
final int fileSize,
final String fileInput) throws Exception {
FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
try (FileInputStream fileInputStream = new FileInputStream(new File(fileInput))) {
importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
}
}
@ -86,8 +87,9 @@ public class DecodeJournal extends LockAbstract {
final int minFiles,
final int fileSize,
final InputStream stream) throws Exception {
Reader reader = new InputStreamReader(stream);
importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
try (Reader reader = new InputStreamReader(stream)) {
importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
}
}
public static void importJournal(final String directory,

View File

@ -71,9 +71,9 @@ public class XMLMessageImporter {
Byte type = 0;
Byte priority = 0;
Long expiration = 0L;
Long timestamp = 0L;
Long id = 0L;
long expiration = 0L;
long timestamp = 0L;
long id = 0L;
org.apache.activemq.artemis.utils.UUID userId = null;
ArrayList<String> queues = new ArrayList<>();
@ -276,7 +276,7 @@ public class XMLMessageImporter {
* CDATA has to be decoded in its entirety.
*
* @param processor used to deal with the decoded CDATA elements
* @param textMessage If this a text message we decode UTF8 and encode as a simple string
* @param decodeTextMessage If this a text message we decode UTF8 and encode as a simple string
*/
private void getMessageBodyBytes(MessageBodyBytesProcessor processor, boolean decodeTextMessage) throws IOException, XMLStreamException {
int currentEventType;

View File

@ -139,8 +139,10 @@ public final class XmlDataImporter extends ActionAbstract {
return null;
}
public void process(String inputFile, String host, int port, boolean transactional) throws Exception {
this.process(new FileInputStream(inputFile), host, port, transactional);
public void process(String inputFileName, String host, int port, boolean transactional) throws Exception {
try (FileInputStream inputFile = new FileInputStream(inputFileName)) {
this.process(inputFile, host, port, transactional);
}
}
/**
@ -207,8 +209,10 @@ public final class XmlDataImporter extends ActionAbstract {
process(inputStream, session, managementSession);
}
public void validate(String file) throws Exception {
validate(new FileInputStream(file));
public void validate(String fileName) throws Exception {
try (FileInputStream file = new FileInputStream(fileName)) {
validate(file);
}
}
public void validate(InputStream inputStream) throws Exception {

View File

@ -86,7 +86,7 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
netToUse = null;
}
} catch (Exception e) {
ActiveMQUtilLogger.LOGGER.failedToSetNIC(e, nicName == null ? " " : nicName);
ActiveMQUtilLogger.LOGGER.failedToSetNIC(e, nicName);
netToUse = null;
}
@ -176,13 +176,13 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
}
@Override
public NetworkHealthCheck setPeriod(long period) {
public synchronized NetworkHealthCheck setPeriod(long period) {
super.setPeriod(period);
return this;
}
@Override
public NetworkHealthCheck setTimeUnit(TimeUnit timeUnit) {
public synchronized NetworkHealthCheck setTimeUnit(TimeUnit timeUnit) {
super.setTimeUnit(timeUnit);
return this;
}
@ -326,6 +326,10 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
}
public boolean check(InetAddress address) {
if (address == null) {
return false;
}
try {
if (address.isReachable(networkInterface, 0, networkTimeout)) {
if (logger.isTraceEnabled()) {
@ -336,7 +340,7 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
return purePing(address);
}
} catch (Exception e) {
ActiveMQUtilLogger.LOGGER.failedToCheckAddress(e, address == null ? " " : address.toString());
ActiveMQUtilLogger.LOGGER.failedToCheckAddress(e, address.toString());
return false;
}
}
@ -392,6 +396,10 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
}
public boolean check(URL url) {
if (url == null) {
return false;
}
try {
URLConnection connection = url.openConnection();
connection.setReadTimeout(networkTimeout);
@ -399,7 +407,7 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
is.close();
return true;
} catch (Exception e) {
ActiveMQUtilLogger.LOGGER.failedToCheckURL(e, url == null ? " " : url.toString());
ActiveMQUtilLogger.LOGGER.failedToCheckURL(e, url.toString());
return false;
}
}

View File

@ -101,17 +101,10 @@ public class FactoryFinder {
}
// lets load the file
BufferedInputStream reader = null;
try {
reader = new BufferedInputStream(in);
try (BufferedInputStream reader = new BufferedInputStream(in)) {
Properties properties = new Properties();
properties.load(reader);
return properties;
} finally {
try {
reader.close();
} catch (Exception e) {
}
}
}
}

View File

@ -295,12 +295,12 @@ public class ConcurrentLongHashMap<V> {
if (storedKey == key) {
if (storedValue == EmptyValue) {
values[bucket] = value != null ? value : valueProvider.apply(key);
values[bucket] = value != null ? value : (valueProvider != null ? valueProvider.apply(key) : null);
++size;
++usedBuckets;
return valueProvider != null ? values[bucket] : null;
} else if (storedValue == DeletedValue) {
values[bucket] = value != null ? value : valueProvider.apply(key);
values[bucket] = value != null ? value : (valueProvider != null ? valueProvider.apply(key) : null);
++size;
return valueProvider != null ? values[bucket] : null;
} else if (!onlyIfAbsent) {
@ -320,7 +320,7 @@ public class ConcurrentLongHashMap<V> {
}
keys[bucket] = key;
values[bucket] = value != null ? value : valueProvider.apply(key);
values[bucket] = value != null ? value : (valueProvider != null ? valueProvider.apply(key) : null);
++size;
return valueProvider != null ? values[bucket] : null;
} else if (storedValue == DeletedValue) {

View File

@ -145,7 +145,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
doSend(address1, message, null);
if (handler != null) {
if (logger.isDebugEnabled()) {
logger.debug("Handler was used on producing messages towards address " + address1.toString() + " however there is no confirmationWindowEnabled");
logger.debug("Handler was used on producing messages towards address " + (address1 == null ? null : address1.toString()) + " however there is no confirmationWindowEnabled");
}
if (!confirmationNotSetLogged) {

View File

@ -469,8 +469,8 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
} else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2) {
ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet;
notifyTopologyChange(updateTransportConfiguration(topMessage));
} else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3) {
ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet;
} else if (type == PacketImpl.CLUSTER_TOPOLOGY_V3) {
ClusterTopologyChangeMessage_V3 topMessage = (ClusterTopologyChangeMessage_V3) packet;
notifyTopologyChange(updateTransportConfiguration(topMessage));
} else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY) {
System.out.println("Channel0Handler.handlePacket");

View File

@ -695,7 +695,7 @@ public final class ChannelImpl implements Channel {
clearUpTo(msg.getCommandID());
}
if (!connection.isClient()) {
if (!connection.isClient() && handler != null) {
handler.handlePacket(packet);
}

View File

@ -1181,7 +1181,7 @@ public class NettyConnector extends AbstractConnector {
@Override
public boolean isEquivalent(Map<String, Object> configuration) {
Boolean httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, configuration);
boolean httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, configuration);
if (httpUpgradeEnabled) {
// we need to look at the activemqServerName to distinguish between ActiveMQ servers that could be proxied behind the same
// HTTP upgrade handler in the Web server
@ -1198,9 +1198,9 @@ public class NettyConnector extends AbstractConnector {
//here we only check host and port because these two parameters
//is sufficient to determine the target host
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
Integer port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration);
int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration);
if (!port.equals(this.port))
if (port != this.port)
return false;
if (host.equals(this.host))

View File

@ -199,7 +199,7 @@ public abstract class AbstractJDBCDriver {
connection.setAutoCommit(false);
final boolean tableExists;
try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
if ((rs == null) || (rs != null && !rs.next())) {
if (rs == null || !rs.next()) {
tableExists = false;
if (logger.isTraceEnabled()) {
logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls));

View File

@ -433,7 +433,7 @@ public class ActiveMQMessage implements javax.jms.Message {
dest = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(address.toString());
}
if (changedAddress != null) {
if (changedAddress != null && dest != null) {
((ActiveMQDestination) dest).setName(changedAddress.toString());
}
}
@ -902,7 +902,7 @@ public class ActiveMQMessage implements javax.jms.Message {
private void checkProperty(final String name) throws JMSException {
if (propertiesReadOnly) {
if (name.equals(ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM)) {
if (name != null && name.equals(ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM)) {
throw new MessageNotWriteableException("You cannot set the Input Stream on received messages. Did you mean " + ActiveMQJMSConstants.JMS_ACTIVEMQ_OUTPUT_STREAM +
" or " +
ActiveMQJMSConstants.JMS_ACTIVEMQ_SAVE_STREAM +

View File

@ -370,7 +370,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
public void sequentialDone() {
if (error) {
callback.onError(errorCode, errorMessage);
if (callback != null) {
callback.onError(errorCode, errorMessage);
}
onIOError(new ActiveMQException(errorCode, errorMessage), errorMessage, null);
errorMessage = null;
} else {

View File

@ -323,12 +323,8 @@ public class CoreAmqpConverter {
if (daMap != null) {
encoder.writeObject(new DeliveryAnnotations(daMap));
}
if (maMap != null) {
encoder.writeObject(new MessageAnnotations(maMap));
}
if (properties != null) {
encoder.writeObject(properties);
}
encoder.writeObject(new MessageAnnotations(maMap));
encoder.writeObject(properties);
if (apMap != null) {
encoder.writeObject(new ApplicationProperties(apMap));
}

View File

@ -1261,7 +1261,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
} else {
tx.rollback();
if (tx != null) {
tx.rollback();
}
}
return null;
@ -1413,7 +1415,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
} else {
tx.commit(onePhase);
if (tx != null) {
tx.commit(onePhase);
}
}
return null;

View File

@ -782,7 +782,7 @@ public final class OpenWireMessageConverter {
MarshallingSupport.marshalDouble(dataOut, doubleVal);
break;
case DataConstants.FLOAT:
Float floatVal = Float.intBitsToFloat(buffer.readInt());
float floatVal = Float.intBitsToFloat(buffer.readInt());
MarshallingSupport.marshalFloat(dataOut, floatVal);
break;
case DataConstants.INT:

View File

@ -250,9 +250,8 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc
} catch (JMSException e) {
logger.debug("Error unsetting the exception listener " + this, e);
}
if (connection != null) {
connection.signalStopToAllSessions();
}
connection.signalStopToAllSessions();
try {
// we must close the ActiveMQConnectionFactory because it contains a ServerLocator
@ -277,9 +276,7 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc
* <p>
* connection close will close the ClientSessionFactory which will close all sessions.
*/
if (connection != null) {
connection.close();
}
connection.close();
if (nonXAsession != null) {
nonXAsession.close();

View File

@ -934,12 +934,11 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
return false;
} else if (!this.producerWindowSize.equals(other.producerWindowSize))
return false;
else if (!protocolManagerFactoryStr.equals(other.protocolManagerFactoryStr))
return false;
if (this.protocolManagerFactoryStr == null) {
if (other.protocolManagerFactoryStr != null)
return false;
}
} else if (!protocolManagerFactoryStr.equals(other.protocolManagerFactoryStr))
return false;
if (this.reconnectAttempts == null) {
if (other.reconnectAttempts != null)
return false;
@ -1007,7 +1006,7 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
if (enableSharedClientID == null) {
if (other.enableSharedClientID != null)
return false;
} else if (!enableSharedClientID == other.enableSharedClientID)
} else if (!this.enableSharedClientID.equals(other.enableSharedClientID))
return false;
return true;

View File

@ -822,7 +822,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
ActiveMQActivationSpec that = (ActiveMQActivationSpec) o;
if (acknowledgeMode != that.acknowledgeMode)
if (acknowledgeMode != null ? !acknowledgeMode.equals(that.acknowledgeMode) : that.acknowledgeMode != null)
return false;
if (subscriptionDurability != that.subscriptionDurability)
return false;

View File

@ -122,10 +122,11 @@ public class MessageServiceManager {
url = new URL(configResourcePath);
}
JAXBContext jaxb = JAXBContext.newInstance(MessageServiceConfiguration.class);
Reader reader = new InputStreamReader(url.openStream());
String xml = XMLUtil.readerToString(reader);
xml = XMLUtil.replaceSystemProps(xml);
configuration = (MessageServiceConfiguration) jaxb.createUnmarshaller().unmarshal(new StringReader(xml));
try (Reader reader = new InputStreamReader(url.openStream())) {
String xml = XMLUtil.readerToString(reader);
xml = XMLUtil.replaceSystemProps(xml);
configuration = (MessageServiceConfiguration) jaxb.createUnmarshaller().unmarshal(new StringReader(xml));
}
}
}
if (threadPool == null)

View File

@ -148,7 +148,7 @@ public class SubscriptionsResource implements TimeoutTask.Callback {
ActiveMQRestLogger.LOGGER.debug("Handling POST request for \"" + uriInfo.getPath() + "\"");
if (timeout == null)
timeout = Long.valueOf(consumerTimeoutSeconds * 1000);
timeout = Long.valueOf((long) consumerTimeoutSeconds * 1000);
boolean deleteWhenIdle = !durable; // default is true if non-durable
if (destroyWhenIdle != null)
deleteWhenIdle = destroyWhenIdle.booleanValue();

View File

@ -96,7 +96,7 @@ public class TimeoutTask implements Runnable {
public void run() {
while (running) {
try {
Thread.sleep(interval * 1000);
Thread.sleep((long) interval * 1000);
} catch (InterruptedException e) {
running = false;
break;

View File

@ -69,21 +69,22 @@ public class FileDeploymentManager {
url = new URL(configurationUrl);
}
// create a reader
Reader reader = new InputStreamReader(url.openStream());
String xml = XMLUtil.readerToString(reader);
//replace any system props
xml = XMLUtil.replaceSystemProps(xml);
Element e = XMLUtil.stringToElement(xml);
try (Reader reader = new InputStreamReader(url.openStream())) {
String xml = XMLUtil.readerToString(reader);
//replace any system props
xml = XMLUtil.replaceSystemProps(xml);
Element e = XMLUtil.stringToElement(xml);
//iterate around all the deployables
for (Deployable deployable : deployables.values()) {
String root = deployable.getRootElement();
NodeList children = e.getElementsByTagName(root);
//if the root element exists then parse it
if (root != null && children.getLength() > 0) {
Node item = children.item(0);
XMLUtil.validate(item, deployable.getSchema());
deployable.parse((Element) item, url);
//iterate around all the deployables
for (Deployable deployable : deployables.values()) {
String root = deployable.getRootElement();
NodeList children = e.getElementsByTagName(root);
//if the root element exists then parse it
if (root != null && children.getLength() > 0) {
Node item = children.item(0);
XMLUtil.validate(item, deployable.getSchema());
deployable.parse((Element) item, url);
}
}
}
}

View File

@ -2044,9 +2044,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
if (diskScanPeriod != other.diskScanPeriod) {
return false;
}
if (connectionTtlCheckInterval != other.connectionTtlCheckInterval) {
return false;
}
return true;
}

View File

@ -1251,17 +1251,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
return configurations.get(0);
}
private static final ArrayList<String> POLICY_LIST = new ArrayList<>();
static {
POLICY_LIST.add("colocated");
POLICY_LIST.add("live-only");
POLICY_LIST.add("replicated");
POLICY_LIST.add("replica");
POLICY_LIST.add("shared-store-master");
POLICY_LIST.add("shared-store-slave");
}
private static final ArrayList<String> HA_LIST = new ArrayList<>();
static {

View File

@ -1131,7 +1131,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
clearIO();
try {
long index = 0;
long start = (page - 1) * pageSize;
long start = (long) (page - 1) * pageSize;
long end = Math.min(page * pageSize, queue.getMessageCount());
ArrayList<CompositeData> c = new ArrayList<>();

View File

@ -40,6 +40,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.Reader;
@ -113,10 +114,11 @@ public final class DescribeJournal {
configuration = new FileConfiguration();
File configFile = new File(instanceFolder + "/etc/broker.xml");
URL url;
Reader reader = null;
try {
url = configFile.toURI().toURL();
Reader reader = new InputStreamReader(url.openStream());
reader = new InputStreamReader(url.openStream());
String xml = XMLUtil.readerToString(reader);
xml = XMLUtil.replaceSystemProps(xml);
Element e = XMLUtil.stringToElement(xml);
@ -130,6 +132,14 @@ public final class DescribeJournal {
}
} catch (Exception e) {
logger.error("failed to load broker.xml", e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
// ignore
}
}
}
} else {
configuration = new ConfigurationImpl();

View File

@ -1325,7 +1325,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
}
if (deliveryTime > 0) {
if (deliveryTime != null && deliveryTime > 0) {
if (tx != null) {
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
} else {

View File

@ -291,8 +291,8 @@ public class ColocatedHAManager implements HAManager {
}
Object serverId = params.get(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME);
if (serverId != null) {
Integer newid = Integer.parseInt(serverId.toString()) + portOffset;
params.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, newid.toString());
int newid = Integer.parseInt(serverId.toString()) + portOffset;
params.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, newid);
}
params.put(TransportConstants.ACTIVEMQ_SERVER_NAME, name);
}

View File

@ -196,16 +196,16 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
private void setupNotificationConsumer() throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Setting up notificationConsumer between " + this.clusterConnection.getConnector() +
" and " +
flowRecord.getBridge().getForwardingConnection() +
" clusterConnection = " +
this.clusterConnection.getName() +
" on server " +
clusterConnection.getServer());
}
if (flowRecord != null) {
if (logger.isDebugEnabled()) {
logger.debug("Setting up notificationConsumer between " + this.clusterConnection.getConnector() +
" and " +
flowRecord.getBridge().getForwardingConnection() +
" clusterConnection = " +
this.clusterConnection.getName() +
" on server " +
clusterConnection.getServer());
}
flowRecord.reset();
if (notifConsumer != null) {

View File

@ -130,9 +130,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
* however we need the guard to synchronize multiple step operations during topology updates.
*/
private final Object recordsGuard = new Object();
private final Map<String, MessageFlowRecord> records = new ConcurrentHashMap<>();
private final Map<String, MessageFlowRecord> disconnectedRecords = new ConcurrentHashMap<>();
private final Map<String, MessageFlowRecord> records = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduledExecutor;

View File

@ -146,10 +146,10 @@ public class LiveOnlyActivation extends Activation {
ClientSessionFactoryInternal clientSessionFactory = null;
while (clientSessionFactory == null) {
Pair<TransportConfiguration, TransportConfiguration> possibleLive = null;
possibleLive = nodeLocator.getLiveConfiguration();
if (possibleLive == null) // we've tried every connector
break;
try {
possibleLive = nodeLocator.getLiveConfiguration();
if (possibleLive == null) // we've tried every connector
break;
clientSessionFactory = (ClientSessionFactoryInternal) scaleDownServerLocator.createSessionFactory(possibleLive.getA(), 0, false);
} catch (Exception e) {
logger.trace("Failed to connect to " + possibleLive.getA());

View File

@ -3685,7 +3685,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (slowConsumerReaperRunnable == null) {
scheduleSlowConsumerReaper(settings);
} else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.threshold != settings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy())) {
slowConsumerReaperFuture.cancel(false);
if (slowConsumerReaperFuture != null) {
slowConsumerReaperFuture.cancel(false);
slowConsumerReaperFuture = null;
}
scheduleSlowConsumerReaper(settings);
}
}

View File

@ -389,7 +389,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// should go back into the
// queue for delivery later.
// TCP-flow control has to be done first than everything else otherwise we may lose notifications
if (!callback.isWritable(this, protocolContext) || !started || transferring) {
if ((callback != null && !callback.isWritable(this, protocolContext)) || !started || transferring) {
return HandleStatus.BUSY;
}

View File

@ -215,7 +215,7 @@ public final class SharedNothingBackupActivation extends Activation {
try {
if (logger.isTraceEnabled()) {
logger.trace("Calling clusterController.connectToNodeInReplicatedCluster(" + possibleLive.getA() + ")");
logger.trace("Calling clusterController.connectToNodeInReplicatedCluster(" + possibleLive != null ? possibleLive.getA() : null + ")");
}
clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getA());
} catch (Exception e) {

View File

@ -51,7 +51,7 @@ public interface ActiveMQServerConsumerPlugin extends ActiveMQServerBasePlugin {
* Before a consumer is created
*
* @param consumerID
* @param QueueBinding
* @param queueBinding
* @param filterString
* @param browseOnly
* @param supportLargeMessage

View File

@ -181,9 +181,9 @@ public class TransactionImpl implements Transaction {
synchronized (timeoutLock) {
boolean timedout;
if (timeoutSeconds == -1) {
timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + defaultTimeout * 1000;
timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + (long) defaultTimeout * 1000;
} else {
timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + timeoutSeconds * 1000;
timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + (long) timeoutSeconds * 1000;
}
if (timedout) {

View File

@ -184,7 +184,7 @@ public class LDAPLoginModule implements LoginModule {
* requests (by verifying that the supplied password is not empty) and
* react appropriately.
*/
if (password == null || (password != null && password.length() == 0))
if (password == null || password.length() == 0)
throw new FailedLoginException("Password cannot be null or empty");
// authenticate will throw LoginException