This commit is based off a patch submitted by admin@int3solutions.com.
It fixes several locations in the code where there are potential resource leaks.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-06-22 18:34:37 +00:00 committed by Daniel Kulp
parent 2c166b7605
commit 42d9fd118d
21 changed files with 273 additions and 241 deletions

View File

@ -272,14 +272,16 @@ public class BrokerService implements Service {
} }
LOCAL_HOST_NAME = localHostName; LOCAL_HOST_NAME = localHostName;
InputStream in = null;
String version = null; String version = null;
if ((in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) { try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) {
BufferedReader reader = new BufferedReader(new InputStreamReader(in)); if (in != null) {
try { try(InputStreamReader isr = new InputStreamReader(in);
version = reader.readLine(); BufferedReader reader = new BufferedReader(isr)) {
} catch(Exception e) { version = reader.readLine();
}
} }
} catch (IOException ie) {
LOG.warn("Error reading broker version ", ie);
} }
BROKER_VERSION = version; BROKER_VERSION = version;
} }

View File

@ -31,14 +31,14 @@ import org.apache.activemq.util.IntrospectionSupport;
/** /**
* A {@link BrokerFactoryHandler} which uses a properties file to configure the * A {@link BrokerFactoryHandler} which uses a properties file to configure the
* broker's various policies. * broker's various policies.
* *
* *
*/ */
public class PropertiesBrokerFactory implements BrokerFactoryHandler { public class PropertiesBrokerFactory implements BrokerFactoryHandler {
public BrokerService createBroker(URI brokerURI) throws Exception { public BrokerService createBroker(URI brokerURI) throws Exception {
Map properties = loadProperties(brokerURI); Map<Object, Object> properties = loadProperties(brokerURI);
BrokerService brokerService = createBrokerService(brokerURI, properties); BrokerService brokerService = createBrokerService(brokerURI, properties);
IntrospectionSupport.setProperties(brokerService, properties); IntrospectionSupport.setProperties(brokerService, properties);
@ -48,14 +48,31 @@ public class PropertiesBrokerFactory implements BrokerFactoryHandler {
/** /**
* Lets load the properties from some external URL or a relative file * Lets load the properties from some external URL or a relative file
*/ */
protected Map loadProperties(URI brokerURI) throws IOException { protected Map<Object, Object> loadProperties(URI brokerURI) throws IOException {
// lets load a URI // lets load a URI
String remaining = brokerURI.getSchemeSpecificPart(); String remaining = brokerURI.getSchemeSpecificPart();
Properties properties = new Properties(); Properties properties = new Properties();
File file = new File(remaining); File file = new File(remaining);
try (InputStream inputStream = loadStream(file, remaining)) {
if (inputStream != null) {
properties.load(inputStream);
}
}
// should we append any system properties?
try {
Properties systemProperties = System.getProperties();
properties.putAll(systemProperties);
} catch (Exception e) {
// ignore security exception
}
return properties;
}
protected InputStream loadStream(File file, String remaining) throws IOException {
InputStream inputStream = null; InputStream inputStream = null;
if (file.exists()) { if (file != null && file.exists()) {
inputStream = new FileInputStream(file); inputStream = new FileInputStream(file);
} else { } else {
URL url = null; URL url = null;
@ -72,19 +89,7 @@ public class PropertiesBrokerFactory implements BrokerFactoryHandler {
inputStream = url.openStream(); inputStream = url.openStream();
} }
} }
if (inputStream != null) { return inputStream;
properties.load(inputStream);
inputStream.close();
}
// should we append any system properties?
try {
Properties systemProperties = System.getProperties();
properties.putAll(systemProperties);
} catch (Exception e) {
// ignore security exception
}
return properties;
} }
protected InputStream findResourceOnClassPath(String remaining) { protected InputStream findResourceOnClassPath(String remaining) {
@ -95,7 +100,7 @@ public class PropertiesBrokerFactory implements BrokerFactoryHandler {
return answer; return answer;
} }
protected BrokerService createBrokerService(URI brokerURI, Map properties) { protected BrokerService createBrokerService(URI brokerURI, Map<Object, Object> properties) {
return new BrokerService(); return new BrokerService();
} }
} }

View File

@ -281,14 +281,17 @@ public final class IOHelper {
} }
public static void copyInputStream(InputStream in, OutputStream out) throws IOException { public static void copyInputStream(InputStream in, OutputStream out) throws IOException {
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; try {
int len = in.read(buffer); byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
while (len >= 0) { int len = in.read(buffer);
out.write(buffer, 0, len); while (len >= 0) {
len = in.read(buffer); out.write(buffer, 0, len);
len = in.read(buffer);
}
} finally {
in.close();
out.close();
} }
in.close();
out.close();
} }
static { static {

View File

@ -504,7 +504,13 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* <CODE>BlobMessage</CODE> object is used to send a message containing * <CODE>BlobMessage</CODE> object is used to send a message containing
* the <CODE>File</CODE> content. Before the message is sent the file * the <CODE>File</CODE> content. Before the message is sent the file
* conent will be uploaded to the broker or some other remote repository * conent will be uploaded to the broker or some other remote repository
* depending on the {@link #getBlobTransferPolicy()}. * depending on the {@link #getBlobTransferPolicy()}. <br/>
* <p>
* The caller of this method is responsible for closing the
* input stream that is used, however the stream can not be closed
* until <b>after</b> the message has been sent. To have this class
* manage the stream and close it automatically, use the method
* {@link ActiveMQSession#createBlobMessage(File)}
* *
* @param in the stream to be uploaded to some remote repo (or the broker) * @param in the stream to be uploaded to some remote repo (or the broker)
* depending on the strategy * depending on the strategy

View File

@ -130,13 +130,14 @@ public class ActiveMQSslConnectionFactory extends ActiveMQConnectionFactory {
KeyStore trustedCertStore = KeyStore.getInstance(getTrustStoreType()); KeyStore trustedCertStore = KeyStore.getInstance(getTrustStoreType());
if (trustStore != null) { if (trustStore != null) {
InputStream tsStream = getInputStream(trustStore); try(InputStream tsStream = getInputStream(trustStore)) {
trustedCertStore.load(tsStream, trustStorePassword.toCharArray()); trustedCertStore.load(tsStream, trustStorePassword.toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(trustedCertStore); tmf.init(trustedCertStore);
trustStoreManagers = tmf.getTrustManagers(); trustStoreManagers = tmf.getTrustManagers();
}
} }
return trustStoreManagers; return trustStoreManagers;
} }
@ -149,10 +150,11 @@ public class ActiveMQSslConnectionFactory extends ActiveMQConnectionFactory {
byte[] sslCert = loadClientCredential(keyStore); byte[] sslCert = loadClientCredential(keyStore);
if (sslCert != null && sslCert.length > 0) { if (sslCert != null && sslCert.length > 0) {
ByteArrayInputStream bin = new ByteArrayInputStream(sslCert); try(ByteArrayInputStream bin = new ByteArrayInputStream(sslCert)) {
ks.load(bin, keyStorePassword.toCharArray()); ks.load(bin, keyStorePassword.toCharArray());
kmf.init(ks, keyStoreKeyPassword !=null ? keyStoreKeyPassword.toCharArray() : keyStorePassword.toCharArray()); kmf.init(ks, keyStoreKeyPassword !=null ? keyStoreKeyPassword.toCharArray() : keyStorePassword.toCharArray());
keystoreManagers = kmf.getKeyManagers(); keystoreManagers = kmf.getKeyManagers();
}
} }
} }
return keystoreManagers; return keystoreManagers;
@ -162,16 +164,16 @@ public class ActiveMQSslConnectionFactory extends ActiveMQConnectionFactory {
if (fileName == null) { if (fileName == null) {
return null; return null;
} }
InputStream in = getInputStream(fileName); try(InputStream in = getInputStream(fileName);
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream()) {
byte[] buf = new byte[512]; byte[] buf = new byte[512];
int i = in.read(buf); int i = in.read(buf);
while (i > 0) { while (i > 0) {
out.write(buf, 0, i); out.write(buf, 0, i);
i = in.read(buf); i = in.read(buf);
}
return out.toByteArray();
} }
in.close();
return out.toByteArray();
} }
protected InputStream getInputStream(String urlOrResource) throws IOException { protected InputStream getInputStream(String urlOrResource) throws IOException {

View File

@ -27,8 +27,8 @@ import org.apache.activemq.command.ActiveMQBlobMessage;
/** /**
* A helper class to represent a required upload of a BLOB to some remote URL * A helper class to represent a required upload of a BLOB to some remote URL
* *
* *
*/ */
public class BlobUploader { public class BlobUploader {

View File

@ -40,7 +40,9 @@ public class DefaultBlobUploadStrategy extends DefaultStrategy implements BlobUp
} }
public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException { public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException {
return uploadStream(message, new FileInputStream(file)); try(FileInputStream fis = new FileInputStream(file)) {
return uploadStream(message, fis);
}
} }
public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws JMSException, IOException { public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws JMSException, IOException {
@ -55,15 +57,13 @@ public class DefaultBlobUploadStrategy extends DefaultStrategy implements BlobUp
// (chunked mode not supported before JRE 1.5) // (chunked mode not supported before JRE 1.5)
connection.setChunkedStreamingMode(transferPolicy.getBufferSize()); connection.setChunkedStreamingMode(transferPolicy.getBufferSize());
OutputStream os = connection.getOutputStream(); try(OutputStream os = connection.getOutputStream()) {
byte[] buf = new byte[transferPolicy.getBufferSize()];
byte[] buf = new byte[transferPolicy.getBufferSize()]; for (int c = fis.read(buf); c != -1; c = fis.read(buf)) {
for (int c = fis.read(buf); c != -1; c = fis.read(buf)) { os.write(buf, 0, c);
os.write(buf, 0, c); os.flush();
os.flush(); }
} }
os.close();
fis.close();
if (!isSuccessfulCode(connection.getResponseCode())) { if (!isSuccessfulCode(connection.getResponseCode())) {
throw new IOException("PUT was not successful: " + connection.getResponseCode() + " " throw new IOException("PUT was not successful: " + connection.getResponseCode() + " "

View File

@ -39,7 +39,9 @@ public class FTPBlobUploadStrategy extends FTPStrategy implements BlobUploadStra
@Override @Override
public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException { public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException {
return uploadStream(message, new FileInputStream(file)); try(FileInputStream fis = new FileInputStream(file)) {
return uploadStream(message, fis);
}
} }
@Override @Override

View File

@ -36,19 +36,19 @@ import org.apache.activemq.command.ActiveMQBlobMessage;
*/ */
public class FileSystemBlobStrategy implements BlobUploadStrategy, BlobDownloadStrategy{ public class FileSystemBlobStrategy implements BlobUploadStrategy, BlobDownloadStrategy{
private final BlobTransferPolicy policy; private final BlobTransferPolicy policy;
private File rootFile; private File rootFile;
public FileSystemBlobStrategy(final BlobTransferPolicy policy) throws MalformedURLException, URISyntaxException { public FileSystemBlobStrategy(final BlobTransferPolicy policy) throws MalformedURLException, URISyntaxException {
this.policy = policy; this.policy = policy;
createRootFolder(); createRootFolder();
} }
/** /**
* Create the root folder if not exist * Create the root folder if not exist
* *
* @throws MalformedURLException * @throws MalformedURLException
* @throws URISyntaxException * @throws URISyntaxException
*/ */
@ -65,7 +65,9 @@ public class FileSystemBlobStrategy implements BlobUploadStrategy, BlobDownloadS
* @see org.apache.activemq.blob.BlobUploadStrategy#uploadFile(org.apache.activemq.command.ActiveMQBlobMessage, java.io.File) * @see org.apache.activemq.blob.BlobUploadStrategy#uploadFile(org.apache.activemq.command.ActiveMQBlobMessage, java.io.File)
*/ */
public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException { public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException {
return uploadStream(message, new FileInputStream(file)); try(FileInputStream fis = new FileInputStream(file)) {
return uploadStream(message, fis);
}
} }
/* /*
@ -74,14 +76,13 @@ public class FileSystemBlobStrategy implements BlobUploadStrategy, BlobDownloadS
*/ */
public URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws JMSException, IOException { public URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws JMSException, IOException {
File f = getFile(message); File f = getFile(message);
FileOutputStream out = new FileOutputStream(f); try(FileOutputStream out = new FileOutputStream(f)) {
byte[] buffer = new byte[policy.getBufferSize()]; byte[] buffer = new byte[policy.getBufferSize()];
for (int c = in.read(buffer); c != -1; c = in.read(buffer)) { for (int c = in.read(buffer); c != -1; c = in.read(buffer)) {
out.write(buffer, 0, c); out.write(buffer, 0, c);
out.flush(); out.flush();
}
} }
out.flush();
out.close();
// File.toURL() is deprecated // File.toURL() is deprecated
return f.toURI().toURL(); return f.toURI().toURL();
} }
@ -104,14 +105,14 @@ public class FileSystemBlobStrategy implements BlobUploadStrategy, BlobDownloadS
return new FileInputStream(getFile(message)); return new FileInputStream(getFile(message));
} }
/** /**
* Return the {@link File} for the {@link ActiveMQBlobMessage}. * Return the {@link File} for the {@link ActiveMQBlobMessage}.
* *
* @param message * @param message
* @return file * @return file
* @throws JMSException * @throws JMSException
* @throws IOException * @throws IOException
*/ */
protected File getFile(ActiveMQBlobMessage message) throws JMSException, IOException { protected File getFile(ActiveMQBlobMessage message) throws JMSException, IOException {
if (message.getURL() != null) { if (message.getURL() != null) {
@ -123,8 +124,8 @@ public class FileSystemBlobStrategy implements BlobUploadStrategy, BlobDownloadS
} }
} }
//replace all : with _ to make windows more happy //replace all : with _ to make windows more happy
String fileName = message.getJMSMessageID().replaceAll(":", "_"); String fileName = message.getJMSMessageID().replaceAll(":", "_");
return new File(rootFile, fileName); return new File(rootFile, fileName);
} }
} }

View File

@ -56,7 +56,7 @@ public class CreateCommand extends AbstractCommand {
protected String amqConf = "conf/activemq.xml"; // default conf if no conf is specified via --amqconf protected String amqConf = "conf/activemq.xml"; // default conf if no conf is specified via --amqconf
// default files to create // default files to create
protected String[][] fileWriteMap = { protected String[][] fileWriteMap = {
{ "winActivemq", "bin/${brokerName}.bat" }, { "winActivemq", "bin/${brokerName}.bat" },
{ "unixActivemq", "bin/${brokerName}" } { "unixActivemq", "bin/${brokerName}" }
@ -84,7 +84,7 @@ public class CreateCommand extends AbstractCommand {
targetAmqBase = new File(token); targetAmqBase = new File(token);
brokerName = targetAmqBase.getName(); brokerName = targetAmqBase.getName();
if (targetAmqBase.exists()) { if (targetAmqBase.exists()) {
BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
@ -191,9 +191,10 @@ public class CreateCommand extends AbstractCommand {
buf.put(data.getBytes()); buf.put(data.getBytes());
buf.flip(); buf.flip();
FileChannel destinationChannel = new FileOutputStream(dest).getChannel(); try(FileOutputStream fos = new FileOutputStream(dest);
destinationChannel.write(buf); FileChannel destinationChannel = fos.getChannel()) {
destinationChannel.close(); destinationChannel.write(buf);
}
// Set file permissions available for Java 6.0 only // Set file permissions available for Java 6.0 only
dest.setExecutable(true); dest.setExecutable(true);
@ -215,11 +216,13 @@ public class CreateCommand extends AbstractCommand {
if (!from.exists()) { if (!from.exists()) {
return; return;
} }
FileChannel sourceChannel = new FileInputStream(from).getChannel();
FileChannel destinationChannel = new FileOutputStream(dest).getChannel(); try(FileInputStream fis = new FileInputStream(from);
sourceChannel.transferTo(0, sourceChannel.size(), destinationChannel); FileChannel sourceChannel = fis.getChannel();
sourceChannel.close(); FileOutputStream fos = new FileOutputStream(dest);
destinationChannel.close(); FileChannel destinationChannel = fos.getChannel()) {
sourceChannel.transferTo(0, sourceChannel.size(), destinationChannel);
}
} }
private void copyConfDirectory(File from, File dest) throws IOException { private void copyConfDirectory(File from, File dest) throws IOException {

View File

@ -24,7 +24,7 @@ import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
/** /**
* *
*/ */
public final class IOHelper { public final class IOHelper {
protected static final int MAX_DIR_NAME_LENGTH; protected static final int MAX_DIR_NAME_LENGTH;
@ -53,7 +53,7 @@ public final class IOHelper {
return ""; return "";
} }
} }
public static boolean deleteFile(File fileToDelete) { public static boolean deleteFile(File fileToDelete) {
if (fileToDelete == null || !fileToDelete.exists()) { if (fileToDelete == null || !fileToDelete.exists()) {
return true; return true;
@ -62,7 +62,7 @@ public final class IOHelper {
result &= fileToDelete.delete(); result &= fileToDelete.delete();
return result; return result;
} }
public static boolean deleteChildren(File parent) { public static boolean deleteChildren(File parent) {
if (parent == null || !parent.exists()) { if (parent == null || !parent.exists()) {
return false; return false;
@ -87,46 +87,49 @@ public final class IOHelper {
} }
} }
} }
return result; return result;
} }
public static void moveFile(File src, File targetDirectory) throws IOException { public static void moveFile(File src, File targetDirectory) throws IOException {
if (!src.renameTo(new File(targetDirectory, src.getName()))) { if (!src.renameTo(new File(targetDirectory, src.getName()))) {
throw new IOException("Failed to move " + src + " to " + targetDirectory); throw new IOException("Failed to move " + src + " to " + targetDirectory);
} }
} }
public static void copyFile(File src, File dest) throws IOException { public static void copyFile(File src, File dest) throws IOException {
FileInputStream fileSrc = new FileInputStream(src); FileInputStream fileSrc = new FileInputStream(src);
FileOutputStream fileDest = new FileOutputStream(dest); FileOutputStream fileDest = new FileOutputStream(dest);
copyInputStream(fileSrc, fileDest); copyInputStream(fileSrc, fileDest);
} }
public static void copyInputStream(InputStream in, OutputStream out) throws IOException { public static void copyInputStream(InputStream in, OutputStream out) throws IOException {
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; try {
int len = in.read(buffer); byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
while (len >= 0) { int len = in.read(buffer);
out.write(buffer, 0, len); while (len >= 0) {
len = in.read(buffer); out.write(buffer, 0, len);
len = in.read(buffer);
}
} finally {
in.close();
out.close();
} }
in.close();
out.close();
}
static {
MAX_DIR_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumDirNameLength","200")).intValue();
MAX_FILE_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumFileNameLength","64")).intValue();
} }
static {
MAX_DIR_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumDirNameLength","200")).intValue();
MAX_FILE_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumFileNameLength","64")).intValue();
}
public static void mkdirs(File dir) throws IOException { public static void mkdirs(File dir) throws IOException {
if (dir.exists()) { if (dir.exists()) {
if (!dir.isDirectory()) { if (!dir.isDirectory()) {
throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name"); throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name");
} }
} else { } else {
if (!dir.mkdirs()) { if (!dir.mkdirs()) {
throw new IOException("Failed to create directory '" + dir+"'"); throw new IOException("Failed to create directory '" + dir+"'");

View File

@ -40,7 +40,7 @@ import javax.security.auth.login.LoginException;
* org.apache.activemq.jaas.textfiledn.user properties respectively. NOTE: This * org.apache.activemq.jaas.textfiledn.user properties respectively. NOTE: This
* class will re-read user and group files for every authentication (i.e it does * class will re-read user and group files for every authentication (i.e it does
* live updates of allowed groups and users). * live updates of allowed groups and users).
* *
* @author sepandm@gmail.com (Sepand) * @author sepandm@gmail.com (Sepand)
*/ */
public class TextFileCertificateLoginModule extends CertificateLoginModule { public class TextFileCertificateLoginModule extends CertificateLoginModule {
@ -71,7 +71,7 @@ public class TextFileCertificateLoginModule extends CertificateLoginModule {
/** /**
* Overriding to allow DN authorization based on DNs specified in text * Overriding to allow DN authorization based on DNs specified in text
* files. * files.
* *
* @param certs The certificate the incoming connection provided. * @param certs The certificate the incoming connection provided.
* @return The user's authenticated name or null if unable to authenticate * @return The user's authenticated name or null if unable to authenticate
* the user. * the user.
@ -88,10 +88,8 @@ public class TextFileCertificateLoginModule extends CertificateLoginModule {
Properties users = new Properties(); Properties users = new Properties();
try { try(java.io.FileInputStream in = new java.io.FileInputStream(usersFile)) {
java.io.FileInputStream in = new java.io.FileInputStream(usersFile);
users.load(in); users.load(in);
in.close();
} catch (IOException ioe) { } catch (IOException ioe) {
throw new LoginException("Unable to load user properties file " + usersFile); throw new LoginException("Unable to load user properties file " + usersFile);
} }
@ -112,7 +110,7 @@ public class TextFileCertificateLoginModule extends CertificateLoginModule {
/** /**
* Overriding to allow for group discovery based on text files. * Overriding to allow for group discovery based on text files.
* *
* @param username The name of the user being examined. This is the same * @param username The name of the user being examined. This is the same
* name returned by getUserNameForCertificates. * name returned by getUserNameForCertificates.
* @return A Set of name Strings for groups this user belongs to. * @return A Set of name Strings for groups this user belongs to.

View File

@ -35,18 +35,18 @@ import org.apache.activemq.util.ByteArrayOutputStream;
* This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob() * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob()
* operations. This is a little more involved since to insert a blob you have * operations. This is a little more involved since to insert a blob you have
* to: * to:
* *
* 1: insert empty blob. 2: select the blob 3: finally update the blob with data * 1: insert empty blob. 2: select the blob 3: finally update the blob with data
* value. * value.
* *
* The databases/JDBC drivers that use this adapter are: * The databases/JDBC drivers that use this adapter are:
* <ul> * <ul>
* <li></li> * <li></li>
* </ul> * </ul>
* *
* @org.apache.xbean.XBean element="blobJDBCAdapter" * @org.apache.xbean.XBean element="blobJDBCAdapter"
* *
* *
*/ */
public class BlobJDBCAdapter extends DefaultJDBCAdapter { public class BlobJDBCAdapter extends DefaultJDBCAdapter {
@ -139,18 +139,15 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
return null; return null;
} }
Blob blob = rs.getBlob(1); Blob blob = rs.getBlob(1);
InputStream is = blob.getBinaryStream();
ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length()); try(InputStream is = blob.getBinaryStream();
int ch; ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length())) {
while ((ch = is.read()) >= 0) { int ch;
os.write(ch); while ((ch = is.read()) >= 0) {
os.write(ch);
}
return os.toByteArray();
} }
is.close();
os.close();
return os.toByteArray();
} finally { } finally {
cleanupExclusiveLock.readLock().unlock(); cleanupExclusiveLock.readLock().unlock();
close(rs); close(rs);

View File

@ -28,15 +28,15 @@ import org.apache.activemq.util.ByteArrayInputStream;
/** /**
* This JDBCAdapter inserts and extracts BLOB data using the * This JDBCAdapter inserts and extracts BLOB data using the
* setBinaryStream()/getBinaryStream() operations. * setBinaryStream()/getBinaryStream() operations.
* *
* The databases/JDBC drivers that use this adapter are: * The databases/JDBC drivers that use this adapter are:
* <ul> * <ul>
* <li>Axion</li> * <li>Axion</li>
* </ul> * </ul>
* *
* @org.apache.xbean.XBean element="streamJDBCAdapter" * @org.apache.xbean.XBean element="streamJDBCAdapter"
* *
* *
*/ */
public class StreamJDBCAdapter extends DefaultJDBCAdapter { public class StreamJDBCAdapter extends DefaultJDBCAdapter {
@ -47,16 +47,13 @@ public class StreamJDBCAdapter extends DefaultJDBCAdapter {
@Override @Override
protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
try { try (InputStream is = rs.getBinaryStream(index);
InputStream is = rs.getBinaryStream(index); ByteArrayOutputStream os = new ByteArrayOutputStream(1024 * 4)) {
ByteArrayOutputStream os = new ByteArrayOutputStream(1024 * 4);
int ch; int ch;
while ((ch = is.read()) >= 0) { while ((ch = is.read()) >= 0) {
os.write(ch); os.write(ch);
} }
is.close();
os.close();
return os.toByteArray(); return os.toByteArray();
} catch (IOException e) { } catch (IOException e) {

View File

@ -196,10 +196,10 @@ public class PageFile {
public byte[] getDiskBound() throws IOException { public byte[] getDiskBound() throws IOException {
if (diskBound == null && diskBoundLocation != -1) { if (diskBound == null && diskBoundLocation != -1) {
diskBound = new byte[length]; diskBound = new byte[length];
RandomAccessFile file = new RandomAccessFile(tmpFile, "r"); try(RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) {
file.seek(diskBoundLocation); file.seek(diskBoundLocation);
file.read(diskBound); file.read(diskBound);
file.close(); }
diskBoundLocation = -1; diskBoundLocation = -1;
} }
return diskBound; return diskBound;

View File

@ -17,11 +17,12 @@
package org.apache.activemq.store.kahadb.disk.util; package org.apache.activemq.store.kahadb.disk.util;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import org.apache.activemq.util.RecoverableRandomAccessFile;
/** /**
* This class is used to get a benchmark the raw disk performance. * This class is used to get a benchmark the raw disk performance.
*/ */
@ -201,84 +202,88 @@ public class DiskBenchmark {
for (int i = 0; i < data.length; i++) { for (int i = 0; i < data.length; i++) {
data[i] = (byte) ('a' + (i % 26)); data[i] = (byte) ('a' + (i % 26));
} }
rc.size = data.length; rc.size = data.length;
RandomAccessFile raf = new RandomAccessFile(file, "rw");
raf.setLength(size);
// Figure out how many writes we can do in the sample interval. long start;
long start = System.currentTimeMillis(); long now;
long now = System.currentTimeMillis(); int ioCount;
int ioCount = 0;
while (true) { try(RecoverableRandomAccessFile raf = new RecoverableRandomAccessFile(file, "rw")) {
if ((now - start) > sampleInterval) { preallocateDataFile(raf, file.getParentFile());
break; start = System.currentTimeMillis();
} now = System.currentTimeMillis();
raf.seek(0); ioCount = 0;
for (long i = 0; i + data.length < size; i += data.length) {
raf.write(data); // Figure out how many writes we can do in the sample interval.
ioCount++; while (true) {
now = System.currentTimeMillis();
if ((now - start) > sampleInterval) { if ((now - start) > sampleInterval) {
break; break;
} }
raf.seek(0);
for (long i = 0; i + data.length < size; i += data.length) {
raf.write(data);
ioCount++;
now = System.currentTimeMillis();
if ((now - start) > sampleInterval) {
break;
}
}
// Sync to disk so that the we actually write the data to disk..
// otherwise OS buffering might not really do the write.
raf.getChannel().force(!SKIP_METADATA_UPDATE);
} }
// Sync to disk so that the we actually write the data to disk..
// otherwise OS buffering might not really do the write.
raf.getChannel().force(!SKIP_METADATA_UPDATE); raf.getChannel().force(!SKIP_METADATA_UPDATE);
} }
raf.getChannel().force(!SKIP_METADATA_UPDATE);
raf.close();
now = System.currentTimeMillis(); now = System.currentTimeMillis();
rc.size = data.length; rc.size = data.length;
rc.writes = ioCount; rc.writes = ioCount;
rc.writeDuration = (now - start); rc.writeDuration = (now - start);
raf = new RandomAccessFile(file, "rw"); try(RecoverableRandomAccessFile raf = new RecoverableRandomAccessFile(file, "rw")) {
start = System.currentTimeMillis(); start = System.currentTimeMillis();
now = System.currentTimeMillis(); now = System.currentTimeMillis();
ioCount = 0; ioCount = 0;
while (true) { while (true) {
if ((now - start) > sampleInterval) {
break;
}
for (long i = 0; i + data.length < size; i += data.length) {
raf.seek(i);
raf.write(data);
raf.getChannel().force(false);
ioCount++;
now = System.currentTimeMillis();
if ((now - start) > sampleInterval) { if ((now - start) > sampleInterval) {
break; break;
} }
for (long i = 0; i + data.length < size; i += data.length) {
raf.seek(i);
raf.write(data);
raf.getChannel().force(!SKIP_METADATA_UPDATE);
ioCount++;
now = System.currentTimeMillis();
if ((now - start) > sampleInterval) {
break;
}
}
} }
} }
raf.close();
now = System.currentTimeMillis(); now = System.currentTimeMillis();
rc.syncWrites = ioCount; rc.syncWrites = ioCount;
rc.syncWriteDuration = (now - start); rc.syncWriteDuration = (now - start);
raf = new RandomAccessFile(file, "rw"); try(RecoverableRandomAccessFile raf = new RecoverableRandomAccessFile(file, "rw")) {
start = System.currentTimeMillis(); start = System.currentTimeMillis();
now = System.currentTimeMillis(); now = System.currentTimeMillis();
ioCount = 0; ioCount = 0;
while (true) { while (true) {
if ((now - start) > sampleInterval) {
break;
}
raf.seek(0);
for (long i = 0; i + data.length < size; i += data.length) {
raf.seek(i);
raf.readFully(data);
ioCount++;
now = System.currentTimeMillis();
if ((now - start) > sampleInterval) { if ((now - start) > sampleInterval) {
break; break;
} }
raf.seek(0);
for (long i = 0; i + data.length < size; i += data.length) {
raf.seek(i);
raf.readFully(data);
ioCount++;
now = System.currentTimeMillis();
if ((now - start) > sampleInterval) {
break;
}
}
} }
} }
raf.close();
rc.reads = ioCount; rc.reads = ioCount;
rc.readDuration = (now - start); rc.readDuration = (now - start);

View File

@ -83,15 +83,17 @@ public class ProtocolConverter {
private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE); private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
static { static {
InputStream in = null;
String version = "5.6.0"; String version = "5.6.0";
if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) { try(InputStream in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) {
BufferedReader reader = new BufferedReader(new InputStreamReader(in)); if (in != null) {
try { try(InputStreamReader isr = new InputStreamReader(in);
version = reader.readLine(); BufferedReader reader = new BufferedReader(isr)) {
} catch(Exception e) { version = reader.readLine();
}
} }
}catch(Exception e) {
} }
BROKER_VERSION = version; BROKER_VERSION = version;
} }

View File

@ -294,8 +294,10 @@ public abstract class AbstractJmsClientSystem extends AbstractObjectProperties {
Properties fileProps = new Properties(); Properties fileProps = new Properties();
try { try {
if (configFile != null) { if (configFile != null) {
LOG.info("Loading properties file: " + configFile.getAbsolutePath()); try(FileInputStream inputStream = new FileInputStream(configFile)) {
fileProps.load(new FileInputStream(configFile)); LOG.info("Loading properties file: " + configFile.getAbsolutePath());
fileProps.load(inputStream);
}
} }
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();

View File

@ -376,13 +376,14 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
} }
// try to load file // try to load file
BufferedReader br = new BufferedReader(new FileReader(f));
StringBuffer payload = new StringBuffer(); StringBuffer payload = new StringBuffer();
String tmp = null; try(FileReader fr = new FileReader(f);
while ((tmp = br.readLine()) != null) { BufferedReader br = new BufferedReader(fr)) {
payload.append(tmp); String tmp = null;
while ((tmp = br.readLine()) != null) {
payload.append(tmp);
}
} }
br.close();
jmsTextMessage = getSession().createTextMessage(payload.toString()); jmsTextMessage = getSession().createTextMessage(payload.toString());
return jmsTextMessage; return jmsTextMessage;
} catch (FileNotFoundException ex) { } catch (FileNotFoundException ex) {

View File

@ -195,22 +195,25 @@ public class XmlFilePerfReportWriter extends AbstractPerfReportWriter {
xmlFileWriter.println("<property name='performanceData'>"); xmlFileWriter.println("<property name='performanceData'>");
xmlFileWriter.println("<list>"); xmlFileWriter.println("<list>");
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(tempLogFile))); try (FileInputStream fileInputStream = new FileInputStream(tempLogFile);
String line; InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream);
while ((line = reader.readLine()) != null) { BufferedReader reader = new BufferedReader(inputStreamReader)) {
if (line.startsWith("[TP-DATA]")) {
handleCsvData(REPORT_PLUGIN_THROUGHPUT, line.substring("[TP-DATA]".length())); String line;
parsePerfCsvData("tpdata", line.substring("[TP-DATA]".length())); while ((line = reader.readLine()) != null) {
} else if (line.startsWith("[CPU-DATA]")) { if (line.startsWith("[TP-DATA]")) {
handleCsvData(REPORT_PLUGIN_CPU, line.substring("[CPU-DATA]".length())); handleCsvData(REPORT_PLUGIN_THROUGHPUT, line.substring("[TP-DATA]".length()));
parsePerfCsvData("cpudata", line.substring("[CPU-DATA]".length())); parsePerfCsvData("tpdata", line.substring("[TP-DATA]".length()));
} else if (line.startsWith("[INFO]")) { } else if (line.startsWith("[CPU-DATA]")) {
xmlFileWriter.println("<info>" + line + "</info>"); handleCsvData(REPORT_PLUGIN_CPU, line.substring("[CPU-DATA]".length()));
} else { parsePerfCsvData("cpudata", line.substring("[CPU-DATA]".length()));
xmlFileWriter.println("<error>" + line + "</error>"); } else if (line.startsWith("[INFO]")) {
xmlFileWriter.println("<info>" + line + "</info>");
} else {
xmlFileWriter.println("<error>" + line + "</error>");
}
} }
} }
xmlFileWriter.println("</list>"); xmlFileWriter.println("</list>");
xmlFileWriter.println("</property>"); xmlFileWriter.println("</property>");
} }

View File

@ -3,7 +3,7 @@
//------------------------------------------------------------------------ //------------------------------------------------------------------------
//Licensed under the Apache License, Version 2.0 (the "License"); //Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License. //you may not use this file except in compliance with the License.
//You may obtain a copy of the License at //You may obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0 //http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software //Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS, //distributed under the License is distributed on an "AS IS" BASIS,
@ -64,19 +64,19 @@ public class AjaxServlet extends MessageListenerServlet {
byte[] data = jsCache.get(resource); byte[] data = jsCache.get(resource);
if (data == null) { if (data == null) {
InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource); try(InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource)) {
if (in != null) { if (in != null) {
ByteArrayOutputStream out = new ByteArrayOutputStream(); try(ByteArrayOutputStream out = new ByteArrayOutputStream()) {
byte[] buf = new byte[4096]; byte[] buf = new byte[4096];
int len = in.read(buf); int len = in.read(buf);
while (len >= 0) { while (len >= 0) {
out.write(buf, 0, len); out.write(buf, 0, len);
len = in.read(buf); len = in.read(buf);
}
data = out.toByteArray();
jsCache.put(resource, data);
}
} }
in.close();
out.close();
data = out.toByteArray();
jsCache.put(resource, data);
} }
} }