ARTEMIS-4682 Improving performance of reloading JDBC storage

- fixing leaks
- page.exists() would lead to a select(all) and find.
This commit is contained in:
Clebert Suconic 2024-03-11 17:09:20 -04:00 committed by clebertsuconic
parent 009687ef7c
commit c5b81d929d
4 changed files with 51 additions and 51 deletions

View File

@ -67,8 +67,6 @@ public class JDBCSequentialFile implements SequentialFile {
private final JDBCSequentialFileFactory fileFactory;
private final Object writeLock;
private final JDBCSequentialFileFactoryDriver dbDriver;
MpscUnboundedArrayQueue<ScheduledWrite> writeQueue = new MpscUnboundedArrayQueue<>(8192);
@ -89,13 +87,11 @@ public class JDBCSequentialFile implements SequentialFile {
final Executor executor,
final ScheduledExecutorService scheduledExecutorService,
final long syncDelay,
final JDBCSequentialFileFactoryDriver driver,
final Object writeLock) throws SQLException {
final JDBCSequentialFileFactoryDriver driver) throws SQLException {
this.fileFactory = fileFactory;
this.filename = filename;
this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : "";
this.executor = executor;
this.writeLock = writeLock;
this.dbDriver = driver;
this.scheduledExecutorService = scheduledExecutorService;
this.syncDelay = syncDelay;
@ -113,14 +109,10 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public boolean exists() {
if (isLoaded.get()) return true;
try {
return fileFactory.listFiles(extension).contains(filename);
} catch (Exception e) {
logger.debug(e.getMessage(), e);
// this shouldn't throw a critical IO Error
// as if the destination does not exists (ot table store removed), the table will not exist and
// we may get a SQL Exception
return dbDriver.getFileID(this) >= 0;
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
return false;
}
}
@ -178,7 +170,7 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public void delete() throws IOException, InterruptedException, ActiveMQException {
try {
synchronized (writeLock) {
synchronized (this) {
if (load()) {
dbDriver.deleteFile(this);
}
@ -193,7 +185,7 @@ public class JDBCSequentialFile implements SequentialFile {
private synchronized int jdbcWrite(byte[] data, IOCallback callback, boolean append) {
try {
logger.debug("Writing {} bytes into {}", data.length, filename);
synchronized (writeLock) {
synchronized (this) {
int noBytes = dbDriver.writeToFile(this, data, append);
seek(append ? writePosition + noBytes : noBytes);
if (logger.isTraceEnabled()) {
@ -360,7 +352,7 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public synchronized int read(ByteBuffer bytes, final IOCallback callback) throws SQLException {
synchronized (writeLock) {
synchronized (this) {
try {
int read = dbDriver.readFromFile(this, bytes);
readPosition += read;
@ -439,7 +431,7 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public void renameTo(String newFileName) throws Exception {
synchronized (writeLock) {
synchronized (this) {
try {
dbDriver.renameFile(this, newFileName);
} catch (SQLException e) {
@ -451,7 +443,7 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public SequentialFile cloneFile() {
try {
JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, scheduledExecutorService, syncDelay, dbDriver, writeLock);
JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, scheduledExecutorService, syncDelay, dbDriver);
clone.setWritePosition(this.writePosition);
return clone;
} catch (Exception e) {
@ -464,7 +456,7 @@ public class JDBCSequentialFile implements SequentialFile {
public void copyTo(SequentialFile cloneFile) throws Exception {
JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
try {
synchronized (writeLock) {
synchronized (this) {
logger.trace("JDBC Copying File. From: {} To: {}", this, cloneFile);
clone.open();
dbDriver.copyFileData(this, clone);

View File

@ -20,9 +20,6 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
@ -32,11 +29,11 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
@ -44,14 +41,14 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
private boolean started;
private final Set<JDBCSequentialFile> files = new ConcurrentHashSet<>();
private final Executor executor;
private final Map<String, Object> fileLocks = new ConcurrentHashMap<>();
private JDBCSequentialFileFactoryDriver dbDriver;
private volatile int countOpen = 0;
private static final AtomicIntegerFieldUpdater<JDBCSequentialFileFactory> countOpenUpdater = AtomicIntegerFieldUpdater.newUpdater(JDBCSequentialFileFactory.class, "countOpen");
private final IOCriticalErrorListener criticalErrorListener;
private final long syncDelay;
@ -133,9 +130,8 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override
public SequentialFile createSequentialFile(String fileName) {
try {
fileLocks.putIfAbsent(fileName, new Object());
JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, executor, scheduledExecutorService, syncDelay, dbDriver, fileLocks.get(fileName));
files.add(file);
JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, executor, scheduledExecutorService, syncDelay, dbDriver);
countOpenUpdater.incrementAndGet(this);
return file;
} catch (Exception e) {
criticalErrorListener.onIOException(e, "Error whilst creating JDBC file", null);
@ -144,11 +140,11 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
}
public void sequentialFileClosed(SequentialFile file) {
files.remove(file);
countOpenUpdater.decrementAndGet(this);
}
public int getNumberOfOpenFiles() {
return files.size();
return countOpenUpdater.get(this);
}
@Override
@ -261,13 +257,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override
public void flush() {
for (SequentialFile file : files) {
try {
file.sync();
} catch (Exception e) {
criticalErrorListener.onIOException(e, "Error during JDBC file sync.", file.getFileName());
}
}
}
public synchronized void destroy() throws SQLException {

View File

@ -106,7 +106,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException
*/
public void openFile(JDBCSequentialFile file) throws SQLException {
final long fileId = fileExists(file);
final long fileId = getFileID(file);
if (fileId < 0) {
createFile(file);
} else {
@ -126,7 +126,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @return
* @throws SQLException
*/
public long fileExists(JDBCSequentialFile file) throws SQLException {
public long getFileID(JDBCSequentialFile file) throws SQLException {
try (Connection connection = connectionProvider.getConnection()) {
try (PreparedStatement selectFileByFileName = connection.prepareStatement(this.selectFileByFileName)) {
connection.setAutoCommit(false);

View File

@ -49,9 +49,12 @@ public class RealServerDatabasePagingTest extends ParameterDBTestBase {
private static final int MAX_MESSAGES = Integer.parseInt(testProperty(TEST_NAME, "MAX_MESSAGES", "200"));
private static final int MESSAGE_SIZE = Integer.parseInt(testProperty(TEST_NAME, "MESSAGE_SIZE", "1000"));
private static final int SOAK_MAX_MESSAGES = Integer.parseInt(testProperty(TEST_NAME, "SOAK_MAX_MESSAGES", "100000"));
private static final int COMMIT_INTERVAL = Integer.parseInt(testProperty(TEST_NAME, "COMMIT_INTERVAL", "100"));
private static final int MESSAGE_SIZE = Integer.parseInt(testProperty(TEST_NAME, "MESSAGE_SIZE", "1000"));
private static final int SOAK_MESSAGE_SIZE = Integer.parseInt(testProperty(TEST_NAME, "SOAK_MESSAGE_SIZE", "1000"));
private static final int COMMIT_INTERVAL = Integer.parseInt(testProperty(TEST_NAME, "COMMIT_INTERVAL", "1000"));
Process serverProcess;
@ -69,29 +72,37 @@ public class RealServerDatabasePagingTest extends ParameterDBTestBase {
@Test
public void testPaging() throws Exception {
testPaging("CORE");
testPaging("AMQP");
testPaging("OPENWIRE");
testPaging("CORE", MAX_MESSAGES, MESSAGE_SIZE);
testPaging("AMQP", MAX_MESSAGES, MESSAGE_SIZE);
testPaging("OPENWIRE", MAX_MESSAGES, MESSAGE_SIZE);
}
public void testPaging(String protocol) throws Exception {
@Test
public void testSoakPaging() throws Exception {
testPaging("AMQP", SOAK_MAX_MESSAGES, SOAK_MESSAGE_SIZE);
}
private void testPaging(String protocol, int messages, int messageSize) throws Exception {
logger.info("performing paging test on protocol={} and db={}", protocol, database);
final String queueName = "QUEUE_" + RandomUtil.randomString() + "_" + protocol + "_" + database;
ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
byte[] messageLoad = RandomUtil.randomBytes(messageSize);
try (Connection connection = connectionFactory.createConnection()) {
byte[] messageLoad = new byte[MESSAGE_SIZE];
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < MAX_MESSAGES; i++) {
for (int i = 0; i < messages; i++) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(messageLoad);
message.setIntProperty("i", i);
producer.send(message);
if (i % COMMIT_INTERVAL == 0) {
logger.info("Sent {} messages", i);
session.commit();
}
}
@ -107,15 +118,23 @@ public class RealServerDatabasePagingTest extends ParameterDBTestBase {
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < MAX_MESSAGES; i++) {
for (int i = 0; i < messages; i++) {
BytesMessage message = (BytesMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getIntProperty("i"));
Assert.assertEquals(MESSAGE_SIZE, message.getBodyLength());
Assert.assertEquals(messageSize, message.getBodyLength());
byte[] bytesOutput = new byte[(int)message.getBodyLength()];
message.readBytes(bytesOutput);
Assert.assertArrayEquals(messageLoad, bytesOutput);
if (i % COMMIT_INTERVAL == 0) {
logger.info("Received {}", i);
session.commit();
}
}
session.commit();
Assert.assertNull(consumer.receiveNoWait());
}