diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java index bebff6cbe1..db882acef6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java @@ -50,7 +50,7 @@ import org.apache.commons.logging.LogFactory; * * @version $Revision: 1.1.1.1 $ */ -public final class AsyncDataManager { +public class AsyncDataManager { public static final int CONTROL_RECORD_MAX_LENGTH = 1024; public static final int ITEM_HEAD_RESERVED_SPACE = 21; @@ -75,28 +75,28 @@ public final class AsyncDataManager { protected final Map inflightWrites = new ConcurrentHashMap(); - File directory = new File(DEFAULT_DIRECTORY); - File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY); - String filePrefix = DEFAULT_FILE_PREFIX; - ControlFile controlFile; - boolean started; - boolean useNio = true; + protected File directory = new File(DEFAULT_DIRECTORY); + protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY); + protected String filePrefix = DEFAULT_FILE_PREFIX; + protected ControlFile controlFile; + protected boolean started; + protected boolean useNio = true; - private int maxFileLength = DEFAULT_MAX_FILE_LENGTH; - private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - 1024 * 512; + protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; + protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - 1024 * 512; - private DataFileAppender appender; - private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this); + protected DataFileAppender appender; + protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this); - private Map fileMap = new HashMap(); - private Map fileByFileMap = new LinkedHashMap(); - private DataFile currentWriteFile; + protected Map fileMap = new HashMap(); + protected Map fileByFileMap = new LinkedHashMap(); + protected DataFile currentWriteFile; - private Location mark; - private final AtomicReference lastAppendLocation = new AtomicReference(); - private Runnable cleanupTask; - private final AtomicLong storeSize; - private boolean archiveDataLogs; + protected Location mark; + protected final AtomicReference lastAppendLocation = new AtomicReference(); + protected Runnable cleanupTask; + protected final AtomicLong storeSize; + protected boolean archiveDataLogs; public AsyncDataManager(AtomicLong storeSize) { this.storeSize=storeSize; @@ -194,7 +194,7 @@ public final class AsyncDataManager { Scheduler.executePeriodically(cleanupTask, 1000 * 30); } - private Location recoveryCheck(DataFile dataFile, Location location) throws IOException { + protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException { if (location == null) { location = new Location(); location.setDataFileId(dataFile.getDataFileId()); @@ -213,7 +213,7 @@ public final class AsyncDataManager { return location; } - private void unmarshallState(ByteSequence sequence) throws IOException { + protected void unmarshallState(ByteSequence sequence) throws IOException { ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength()); DataInputStream dis = new DataInputStream(bais); if (dis.readBoolean()) { @@ -596,7 +596,7 @@ public final class AsyncDataManager { storeState(sync); } - private synchronized void storeState(boolean sync) throws IOException { + protected synchronized void storeState(boolean sync) throws IOException { ByteSequence state = marshallState(); appender.storeItem(state, Location.MARK_TYPE, sync); controlFile.store(state, sync); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java index 43e9c83ff9..06a6962c8c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java @@ -28,14 +28,14 @@ import org.apache.activemq.util.LinkedNode; * * @version $Revision: 1.1.1.1 $ */ -class DataFile extends LinkedNode implements Comparable { +public class DataFile extends LinkedNode implements Comparable { - private final File file; - private final Integer dataFileId; - private final int preferedSize; + protected final File file; + protected final Integer dataFileId; + protected final int preferedSize; - private int length; - private int referenceCount; + protected int length; + protected int referenceCount; DataFile(File file, int number, int preferedSize) { this.file = file; diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java new file mode 100644 index 0000000000..3203527ef5 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.activemq.thread.Scheduler; +import org.apache.activemq.util.ByteSequence; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * An AsyncDataManager that works in read only mode against multiple data directories. + * Useful for reading back archived data files. + */ +public class ReadOnlyAsyncDataManager extends AsyncDataManager { + + private static final Log LOG = LogFactory.getLog(ReadOnlyAsyncDataManager.class); + private final ArrayList dirs; + + public ReadOnlyAsyncDataManager(final ArrayList dirs) { + this.dirs = dirs; + } + + @SuppressWarnings("unchecked") + public synchronized void start() throws IOException { + if (started) { + return; + } + + started = true; + + ArrayList files = new ArrayList(); + for (File directory : dirs) { + final File d = directory; + File[] f = d.listFiles(new FilenameFilter() { + public boolean accept(File dir, String n) { + return dir.equals(d) && n.startsWith(filePrefix); + } + }); + for (int i = 0; i < f.length; i++) { + files.add(f[i]); + } + } + + for (File file : files) { + try { + String n = file.getName(); + String numStr = n.substring(filePrefix.length(), n.length()); + int num = Integer.parseInt(numStr); + DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength); + fileMap.put(dataFile.getDataFileId(), dataFile); + storeSize.addAndGet(dataFile.getLength()); + } catch (NumberFormatException e) { + // Ignore file that do not match the pattern. + } + } + + // Sort the list so that we can link the DataFiles together in the + // right order. + List dataFiles = new ArrayList(fileMap.values()); + Collections.sort(dataFiles); + currentWriteFile = null; + for (DataFile df : dataFiles) { + if (currentWriteFile != null) { + currentWriteFile.linkAfter(df); + } + currentWriteFile = df; + fileByFileMap.put(df.getFile(), df); + } + + // Need to check the current Write File to see if there was a partial + // write to it. + if (currentWriteFile != null) { + + // See if the lastSyncedLocation is valid.. + Location l = lastAppendLocation.get(); + if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) { + l = null; + } + } + } + + public synchronized void close() throws IOException { + if (!started) { + return; + } + accessorPool.close(); + fileMap.clear(); + fileByFileMap.clear(); + started = false; + } + + + public Location getFirstLocation() throws IllegalStateException, IOException { + if( currentWriteFile == null ) { + return null; + } + + DataFile first = (DataFile)currentWriteFile.getHeadNode(); + Location cur = new Location(); + cur.setDataFileId(first.getDataFileId()); + cur.setOffset(0); + cur.setSize(0); + return getNextLocation(cur); + } + + @Override + public synchronized boolean delete() throws IOException { + throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager"); + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java new file mode 100644 index 0000000000..9eac7c1ad5 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.apache.activemq.util.IOHelper; + +/** + * Allows you to open a data file in read only mode. Useful when working with + * archived data files. + */ +public class ReadOnlyDataFile extends DataFile { + + ReadOnlyDataFile(File file, int number, int preferedSize) { + super(file, number, preferedSize); + } + + + public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException { + RandomAccessFile rc = new RandomAccessFile(file, "r"); + // When we start to write files size them up so that the OS has a chance + // to allocate the file contigously. + if (appender) { + if (length < preferedSize) { + rc.setLength(preferedSize); + } + } + return rc; + } + + public void closeRandomAccessFile(RandomAccessFile file) throws IOException { + file.close(); + } + + public synchronized boolean delete() throws IOException { + throw new RuntimeException("Not valid on a read only file."); + } + + public synchronized void move(File targetDirectory) throws IOException{ + throw new RuntimeException("Not valid on a read only file."); + } + +}