diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java index e2da151739..a5f38d700c 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java @@ -99,7 +99,7 @@ public class JDBCSequentialFile implements SequentialFile { } @Override - public synchronized void open() throws Exception { + public void open() throws Exception { try { if (!isOpen) { synchronized (writeLock) { @@ -151,12 +151,14 @@ public class JDBCSequentialFile implements SequentialFile { } } - private synchronized int internalWrite(byte[] data, IOCallback callback) throws Exception { + private synchronized int internalWrite(byte[] data, IOCallback callback) { try { synchronized (writeLock) { int noBytes = dbDriver.writeToFile(this, data); seek(noBytes); - System.out.println("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size()); + if (logger.isTraceEnabled()) { + logger.trace("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size()); + } if (callback != null) callback.done(); return noBytes; @@ -169,42 +171,25 @@ public class JDBCSequentialFile implements SequentialFile { return 0; } - public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) throws Exception { + public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) { byte[] data = new byte[buffer.readableBytes()]; buffer.readBytes(data); return internalWrite(data, callback); } - private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) throws Exception { + private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) { return internalWrite(buffer.array(), callback); } private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) { - executor.execute(new Runnable() { - @Override - public void run() { - try { - internalWrite(bytes, callback); - } catch (Exception e) { - logger.error(e); - // internalWrite will notify the CriticalIOErrorListener - } - } + executor.execute(() -> { + internalWrite(bytes, callback); }); } private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) { - final SequentialFile file = this; - executor.execute(new Runnable() { - @Override - public void run() { - try { - internalWrite(bytes, callback); - } catch (Exception e) { - logger.error(e); - fileFactory.onIOError(e, "Error on JDBC file sync", file); - } - } + executor.execute(() -> { + internalWrite(bytes, callback); }); } @@ -292,19 +277,16 @@ public class JDBCSequentialFile implements SequentialFile { } @Override - public synchronized void close() throws Exception { + public void close() throws Exception { isOpen = false; + sync(); + fileFactory.sequentialFileClosed(this); } @Override public void sync() throws IOException { final SimpleWaitIOCallback callback = new SimpleWaitIOCallback(); - executor.execute(new Runnable() { - @Override - public void run() { - callback.done(); - } - }); + executor.execute(callback::done); try { callback.waitCompletion(); diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index ae2e793666..48cb638528 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -21,10 +21,10 @@ import java.io.File; import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.SQLException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.jboss.logging.Logger; public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent { @@ -42,7 +43,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM private boolean started; - private final List files = new ArrayList<>(); + private final Set files = new ConcurrentHashSet<>(); private final Executor executor; @@ -155,6 +156,14 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM return null; } + public void sequentialFileClosed(SequentialFile file) { + files.remove(file); + } + + public int getNumberOfOpenFiles() { + return files.size(); + } + @Override public int getMaxIO() { return 1; diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java index a901f6a3b0..822e579d84 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java @@ -117,6 +117,10 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { } } + void removeFile(JDBCSequentialFile file) { + + } + /** * Checks to see if a file with filename and extension exists. If so returns the ID of the file or returns -1. * diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java index b04b74ff0f..08008701fa 100644 --- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java @@ -20,11 +20,12 @@ import java.nio.ByteBuffer; import java.sql.DriverManager; import java.sql.SQLException; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -41,6 +42,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.derby.jdbc.EmbeddedDriver; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -59,9 +61,11 @@ public class JDBCSequentialFileFactoryTest { private JDBCSequentialFileFactory factory; + private ExecutorService executor; + @Before public void setup() throws Exception { - Executor executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); + executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); String connectionUrl = "jdbc:derby:target/data;create=true"; String tableName = "FILES"; @@ -75,6 +79,7 @@ public class JDBCSequentialFileFactoryTest { @After public void tearDown() throws Exception { + executor.shutdown(); factory.destroy(); } @@ -94,6 +99,8 @@ public class JDBCSequentialFileFactoryTest { @Test public void testCreateFiles() throws Exception { int noFiles = 100; + List files = new LinkedList<>(); + Set fileNames = new HashSet<>(); for (int i = 0; i < noFiles; i++) { String fileName = UUID.randomUUID().toString() + ".txt"; @@ -101,10 +108,17 @@ public class JDBCSequentialFileFactoryTest { SequentialFile file = factory.createSequentialFile(fileName); // We create files on Open file.open(); + files.add(file); } List queryFileNames = factory.listFiles("txt"); assertTrue(queryFileNames.containsAll(fileNames)); + + for (SequentialFile file: files) { + file.close(); + } + + Assert.assertEquals(0, factory.getNumberOfOpenFiles()); } @Test