This commit is contained in:
Clebert Suconic 2018-10-10 18:50:12 -04:00
commit d441e7595d
13 changed files with 199 additions and 13 deletions

View File

@ -155,12 +155,12 @@ public class JDBCSequentialFile implements SequentialFile {
}
}
private synchronized int internalWrite(byte[] data, IOCallback callback) {
private synchronized int internalWrite(byte[] data, IOCallback callback, boolean append) {
try {
open();
synchronized (writeLock) {
int noBytes = dbDriver.writeToFile(this, data);
seek(noBytes);
int noBytes = dbDriver.writeToFile(this, data, append);
seek(append ? writePosition + noBytes : noBytes);
if (logger.isTraceEnabled()) {
logger.trace("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size());
}
@ -177,18 +177,22 @@ public class JDBCSequentialFile implements SequentialFile {
}
public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) {
return internalWrite(buffer, callback, true);
}
public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback, boolean append) {
byte[] data = new byte[buffer.readableBytes()];
buffer.readBytes(data);
return internalWrite(data, callback);
return internalWrite(data, callback, append);
}
private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) {
return internalWrite(buffer.array(), callback);
return internalWrite(buffer.array(), callback, true);
}
private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) {
private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback, boolean append) {
executor.execute(() -> {
internalWrite(bytes, callback);
internalWrite(bytes, callback, append);
});
}
@ -199,13 +203,17 @@ public class JDBCSequentialFile implements SequentialFile {
}
synchronized void seek(long noBytes) {
writePosition += noBytes;
writePosition = noBytes;
}
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback, boolean append) throws Exception {
// We ignore sync since we schedule writes straight away.
scheduleWrite(bytes, callback, append);
}
@Override
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
// We ignore sync since we schedule writes straight away.
scheduleWrite(bytes, callback);
write(bytes, sync, callback, true);
}
@Override
@ -217,7 +225,7 @@ public class JDBCSequentialFile implements SequentialFile {
public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception {
ActiveMQBuffer data = ActiveMQBuffers.fixedBuffer(bytes.getEncodeSize());
bytes.encode(data);
scheduleWrite(data, callback);
write(data, sync, callback, true);
}
@Override

View File

@ -246,6 +246,10 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
}
}
public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
return writeToFile(file, data, true);
}
/**
* Persists data to this files associated database mapping.
*
@ -254,7 +258,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @return
* @throws SQLException
*/
public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException {
synchronized (connection) {
connection.setAutoCommit(false);
appendToLargeObject.setLong(1, file.getId());
@ -266,7 +270,12 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
if (blob == null) {
blob = connection.createBlob();
}
bytesWritten = blob.setBytes(blob.length() + 1, data);
if (append) {
bytesWritten = blob.setBytes(blob.length() + 1, data);
} else {
blob.truncate(0);
bytesWritten = blob.setBytes(1, data);
}
rs.updateBlob(1, blob);
rs.updateRow();
}

View File

@ -201,6 +201,32 @@ public class JDBCSequentialFileFactoryTest {
checkData(file, src);
}
@Test
public void testWriteToFile() throws Exception {
JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
file.open();
ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(1);
src.writeByte((byte)7);
file.internalWrite(src, null);
checkData(file, src);
assertEquals(1, file.size());
file.close();
file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
file.open();
int bufferSize = 1024;
src = ActiveMQBuffers.fixedBuffer(bufferSize);
for (int i = 0; i < bufferSize; i++) {
src.writeByte((byte)i);
}
file.internalWrite(src, null, false);
checkData(file, src);
assertEquals(bufferSize, file.size());
}
@Test
public void testCopyFile() throws Exception {
JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");

View File

@ -184,4 +184,6 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
* This method will re-enable cleanup of pages. Notice that it will also start cleanup threads.
*/
void enableCleanup();
void destroy() throws Exception;
}

View File

@ -49,6 +49,8 @@ public interface PagingStoreFactory {
SequentialFileFactory newFileFactory(SimpleString address) throws Exception;
void removeFileFactory(SequentialFileFactory fileFactory) throws Exception;
void injectMonitor(FileStoreMonitor monitor) throws Exception;
default ScheduledExecutorService getScheduledExecutor() {

View File

@ -317,6 +317,7 @@ public final class PagingManagerImpl implements PagingManager {
PagingStore store = stores.remove(storeName);
if (store != null) {
store.stop();
store.destroy();
}
} finally {
syncLock.readLock().unlock();

View File

@ -18,7 +18,9 @@ package org.apache.activemq.artemis.core.paging.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@ -94,6 +96,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
private final IOCriticalErrorListener criticalErrorListener;
private final Map<SequentialFileFactory, String> factoryToTableName;
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
final StorageManager storageManager,
final long syncTimeout,
@ -108,6 +112,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
this.syncTimeout = syncTimeout;
this.dbConf = dbConf;
this.criticalErrorListener = critialErrorListener;
this.factoryToTableName = new HashMap<>();
start();
}
@ -180,6 +185,32 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
return factory;
}
@Override
public synchronized void removeFileFactory(SequentialFileFactory fileFactory) throws Exception {
((JDBCSequentialFileFactory)fileFactory).destroy();
String tableName = factoryToTableName.remove(fileFactory);
if (tableName != null) {
SimpleString removeTableName = SimpleString.toSimpleString(tableName);
JDBCSequentialFile directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME);
directoryList.open();
int size = ((Long) directoryList.size()).intValue();
ActiveMQBuffer buffer = readActiveMQBuffer(directoryList, size);
ActiveMQBuffer writeBuffer = ActiveMQBuffers.fixedBuffer(size);
while (buffer.readableBytes() > 0) {
SimpleString table = buffer.readSimpleString();
if (!removeTableName.equals(table)) {
writeBuffer.writeSimpleString(table);
}
}
directoryList.write(writeBuffer, true, null, false);
directoryList.close();
}
}
@Override
public void setPagingManager(final PagingManager pagingManager) {
this.pagingManager = pagingManager;
@ -249,6 +280,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
if (jdbcNetworkTimeout >= 0) {
fileFactory.setNetworkTimeout(this.executorFactory.getExecutor(), jdbcNetworkTimeout);
}
factoryToTableName.put(fileFactory, directoryName);
return fileFactory;
}

View File

@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
@ -173,6 +174,14 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
return factory;
}
@Override
public synchronized void removeFileFactory(SequentialFileFactory fileFactory) throws Exception {
File directory = fileFactory.getDirectory();
if (directory.exists()) {
FileUtil.deleteDirectory(directory);
}
}
@Override
public void setPagingManager(final PagingManager pagingManager) {
this.pagingManager = pagingManager;

View File

@ -967,6 +967,13 @@ public class PagingStoreImpl implements PagingStore {
return;
}
@Override
public void destroy() throws Exception {
if (fileFactory != null) {
storeFactory.removeFileFactory(fileFactory);
}
}
private static class FinishPageMessageOperation implements TransactionOperation {
private final PageTransactionInfo pageTransaction;

View File

@ -2595,6 +2595,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
JournalLoadInformation[] journalInfo = loadJournals();
removeExtraAddressStores();
final ServerInfo dumper = new ServerInfo(this, pagingManager);
long dumpInfoInterval = configuration.getServerDumpInterval();
@ -3465,6 +3467,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
private void removeExtraAddressStores() throws Exception {
SimpleString[] storeNames = pagingManager.getStoreNames();
if (storeNames != null && storeNames.length > 0) {
for (SimpleString storeName : storeNames) {
if (getAddressInfo(storeName) == null) {
pagingManager.deletePageStore(storeName);
}
}
}
}
private final class ActivationThread extends Thread {
final Runnable runnable;

View File

@ -6220,6 +6220,75 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
}
@Test
public void testPagingStoreDestroyed() throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
final int numberOfMessages = 5000;
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= MESSAGE_SIZE; j++) {
bb.put(getSamplebyte(j));
}
for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
producer.send(message);
if (i % 1000 == 0) {
session.commit();
}
}
session.commit();
producer.close();
assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging());
session.deleteQueue(PagingTest.ADDRESS);
session.close();
sf.close();
locator.close();
locator = null;
sf = null;
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
// Ensure pagingStore is physically deleted
server.getPagingManager().reloadStores();
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
server.stop();
server.start();
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
// Ensure pagingStore is physically deleted
server.getPagingManager().reloadStores();
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
server.stop();
}
@Override
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(serverID, netty);

View File

@ -469,5 +469,9 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
public boolean checkReleasedMemory() {
return true;
}
@Override
public void destroy() throws Exception {
}
}
}

View File

@ -802,6 +802,10 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
return factory;
}
@Override
public void removeFileFactory(SequentialFileFactory fileFactory) throws Exception {
}
@Override
public PagingStore newStore(final SimpleString destinationName, final AddressSettings addressSettings) {
return null;