ARTEMIS-1155 SequentialFiles leaking on JDBCSequentialFileFactory

This commit is contained in:
Clebert Suconic 2017-05-09 18:24:21 -04:00 committed by Justin Bertram
parent ec49c4310b
commit f328c24b94
4 changed files with 46 additions and 37 deletions

View File

@ -99,7 +99,7 @@ public class JDBCSequentialFile implements SequentialFile {
} }
@Override @Override
public synchronized void open() throws Exception { public void open() throws Exception {
try { try {
if (!isOpen) { if (!isOpen) {
synchronized (writeLock) { synchronized (writeLock) {
@ -151,12 +151,14 @@ public class JDBCSequentialFile implements SequentialFile {
} }
} }
private synchronized int internalWrite(byte[] data, IOCallback callback) throws Exception { private synchronized int internalWrite(byte[] data, IOCallback callback) {
try { try {
synchronized (writeLock) { synchronized (writeLock) {
int noBytes = dbDriver.writeToFile(this, data); int noBytes = dbDriver.writeToFile(this, data);
seek(noBytes); seek(noBytes);
System.out.println("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size()); if (logger.isTraceEnabled()) {
logger.trace("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size());
}
if (callback != null) if (callback != null)
callback.done(); callback.done();
return noBytes; return noBytes;
@ -169,42 +171,25 @@ public class JDBCSequentialFile implements SequentialFile {
return 0; return 0;
} }
public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) throws Exception { public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) {
byte[] data = new byte[buffer.readableBytes()]; byte[] data = new byte[buffer.readableBytes()];
buffer.readBytes(data); buffer.readBytes(data);
return internalWrite(data, callback); return internalWrite(data, callback);
} }
private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) throws Exception { private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) {
return internalWrite(buffer.array(), callback); return internalWrite(buffer.array(), callback);
} }
private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) { private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) {
executor.execute(new Runnable() { executor.execute(() -> {
@Override internalWrite(bytes, callback);
public void run() {
try {
internalWrite(bytes, callback);
} catch (Exception e) {
logger.error(e);
// internalWrite will notify the CriticalIOErrorListener
}
}
}); });
} }
private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) { private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
final SequentialFile file = this; executor.execute(() -> {
executor.execute(new Runnable() { internalWrite(bytes, callback);
@Override
public void run() {
try {
internalWrite(bytes, callback);
} catch (Exception e) {
logger.error(e);
fileFactory.onIOError(e, "Error on JDBC file sync", file);
}
}
}); });
} }
@ -292,19 +277,16 @@ public class JDBCSequentialFile implements SequentialFile {
} }
@Override @Override
public synchronized void close() throws Exception { public void close() throws Exception {
isOpen = false; isOpen = false;
sync();
fileFactory.sequentialFileClosed(this);
} }
@Override @Override
public void sync() throws IOException { public void sync() throws IOException {
final SimpleWaitIOCallback callback = new SimpleWaitIOCallback(); final SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
executor.execute(new Runnable() { executor.execute(callback::done);
@Override
public void run() {
callback.done();
}
});
try { try {
callback.waitCompletion(); callback.waitCompletion();

View File

@ -21,10 +21,10 @@ import java.io.File;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent { public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
@ -42,7 +43,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
private boolean started; private boolean started;
private final List<JDBCSequentialFile> files = new ArrayList<>(); private final Set<JDBCSequentialFile> files = new ConcurrentHashSet<>();
private final Executor executor; private final Executor executor;
@ -155,6 +156,14 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
return null; return null;
} }
public void sequentialFileClosed(SequentialFile file) {
files.remove(file);
}
public int getNumberOfOpenFiles() {
return files.size();
}
@Override @Override
public int getMaxIO() { public int getMaxIO() {
return 1; return 1;

View File

@ -117,6 +117,10 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
} }
} }
void removeFile(JDBCSequentialFile file) {
}
/** /**
* Checks to see if a file with filename and extension exists. If so returns the ID of the file or returns -1. * Checks to see if a file with filename and extension exists. If so returns the ID of the file or returns -1.
* *

View File

@ -20,11 +20,12 @@ import java.nio.ByteBuffer;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -41,6 +42,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.derby.jdbc.EmbeddedDriver; import org.apache.derby.jdbc.EmbeddedDriver;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -59,9 +61,11 @@ public class JDBCSequentialFileFactoryTest {
private JDBCSequentialFileFactory factory; private JDBCSequentialFileFactory factory;
private ExecutorService executor;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
Executor executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
String connectionUrl = "jdbc:derby:target/data;create=true"; String connectionUrl = "jdbc:derby:target/data;create=true";
String tableName = "FILES"; String tableName = "FILES";
@ -75,6 +79,7 @@ public class JDBCSequentialFileFactoryTest {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
executor.shutdown();
factory.destroy(); factory.destroy();
} }
@ -94,6 +99,8 @@ public class JDBCSequentialFileFactoryTest {
@Test @Test
public void testCreateFiles() throws Exception { public void testCreateFiles() throws Exception {
int noFiles = 100; int noFiles = 100;
List<SequentialFile> files = new LinkedList<>();
Set<String> fileNames = new HashSet<>(); Set<String> fileNames = new HashSet<>();
for (int i = 0; i < noFiles; i++) { for (int i = 0; i < noFiles; i++) {
String fileName = UUID.randomUUID().toString() + ".txt"; String fileName = UUID.randomUUID().toString() + ".txt";
@ -101,10 +108,17 @@ public class JDBCSequentialFileFactoryTest {
SequentialFile file = factory.createSequentialFile(fileName); SequentialFile file = factory.createSequentialFile(fileName);
// We create files on Open // We create files on Open
file.open(); file.open();
files.add(file);
} }
List<String> queryFileNames = factory.listFiles("txt"); List<String> queryFileNames = factory.listFiles("txt");
assertTrue(queryFileNames.containsAll(fileNames)); assertTrue(queryFileNames.containsAll(fileNames));
for (SequentialFile file: files) {
file.close();
}
Assert.assertEquals(0, factory.getNumberOfOpenFiles());
} }
@Test @Test