mirror of https://github.com/apache/activemq.git
Adding a ReadOnlyAsyncDataManager so that you can access a set of data files in a read only way.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@618082 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3ac0537e3e
commit
ef0e1bac6e
|
@ -50,7 +50,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
*
|
||||
* @version $Revision: 1.1.1.1 $
|
||||
*/
|
||||
public final class AsyncDataManager {
|
||||
public class AsyncDataManager {
|
||||
|
||||
public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
|
||||
public static final int ITEM_HEAD_RESERVED_SPACE = 21;
|
||||
|
@ -75,28 +75,28 @@ public final class AsyncDataManager {
|
|||
|
||||
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
|
||||
|
||||
File directory = new File(DEFAULT_DIRECTORY);
|
||||
File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
|
||||
String filePrefix = DEFAULT_FILE_PREFIX;
|
||||
ControlFile controlFile;
|
||||
boolean started;
|
||||
boolean useNio = true;
|
||||
protected File directory = new File(DEFAULT_DIRECTORY);
|
||||
protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
|
||||
protected String filePrefix = DEFAULT_FILE_PREFIX;
|
||||
protected ControlFile controlFile;
|
||||
protected boolean started;
|
||||
protected boolean useNio = true;
|
||||
|
||||
private int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
|
||||
private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - 1024 * 512;
|
||||
protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
|
||||
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - 1024 * 512;
|
||||
|
||||
private DataFileAppender appender;
|
||||
private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
|
||||
protected DataFileAppender appender;
|
||||
protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
|
||||
|
||||
private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
|
||||
private Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
|
||||
private DataFile currentWriteFile;
|
||||
protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
|
||||
protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
|
||||
protected DataFile currentWriteFile;
|
||||
|
||||
private Location mark;
|
||||
private final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
|
||||
private Runnable cleanupTask;
|
||||
private final AtomicLong storeSize;
|
||||
private boolean archiveDataLogs;
|
||||
protected Location mark;
|
||||
protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
|
||||
protected Runnable cleanupTask;
|
||||
protected final AtomicLong storeSize;
|
||||
protected boolean archiveDataLogs;
|
||||
|
||||
public AsyncDataManager(AtomicLong storeSize) {
|
||||
this.storeSize=storeSize;
|
||||
|
@ -194,7 +194,7 @@ public final class AsyncDataManager {
|
|||
Scheduler.executePeriodically(cleanupTask, 1000 * 30);
|
||||
}
|
||||
|
||||
private Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
|
||||
protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
|
||||
if (location == null) {
|
||||
location = new Location();
|
||||
location.setDataFileId(dataFile.getDataFileId());
|
||||
|
@ -213,7 +213,7 @@ public final class AsyncDataManager {
|
|||
return location;
|
||||
}
|
||||
|
||||
private void unmarshallState(ByteSequence sequence) throws IOException {
|
||||
protected void unmarshallState(ByteSequence sequence) throws IOException {
|
||||
ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
|
||||
DataInputStream dis = new DataInputStream(bais);
|
||||
if (dis.readBoolean()) {
|
||||
|
@ -596,7 +596,7 @@ public final class AsyncDataManager {
|
|||
storeState(sync);
|
||||
}
|
||||
|
||||
private synchronized void storeState(boolean sync) throws IOException {
|
||||
protected synchronized void storeState(boolean sync) throws IOException {
|
||||
ByteSequence state = marshallState();
|
||||
appender.storeItem(state, Location.MARK_TYPE, sync);
|
||||
controlFile.store(state, sync);
|
||||
|
|
|
@ -28,14 +28,14 @@ import org.apache.activemq.util.LinkedNode;
|
|||
*
|
||||
* @version $Revision: 1.1.1.1 $
|
||||
*/
|
||||
class DataFile extends LinkedNode implements Comparable<DataFile> {
|
||||
public class DataFile extends LinkedNode implements Comparable<DataFile> {
|
||||
|
||||
private final File file;
|
||||
private final Integer dataFileId;
|
||||
private final int preferedSize;
|
||||
protected final File file;
|
||||
protected final Integer dataFileId;
|
||||
protected final int preferedSize;
|
||||
|
||||
private int length;
|
||||
private int referenceCount;
|
||||
protected int length;
|
||||
protected int referenceCount;
|
||||
|
||||
DataFile(File file, int number, int preferedSize) {
|
||||
this.file = file;
|
||||
|
|
|
@ -0,0 +1,131 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.async;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.thread.Scheduler;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* An AsyncDataManager that works in read only mode against multiple data directories.
|
||||
* Useful for reading back archived data files.
|
||||
*/
|
||||
public class ReadOnlyAsyncDataManager extends AsyncDataManager {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ReadOnlyAsyncDataManager.class);
|
||||
private final ArrayList<File> dirs;
|
||||
|
||||
public ReadOnlyAsyncDataManager(final ArrayList<File> dirs) {
|
||||
this.dirs = dirs;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public synchronized void start() throws IOException {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
|
||||
started = true;
|
||||
|
||||
ArrayList<File> files = new ArrayList<File>();
|
||||
for (File directory : dirs) {
|
||||
final File d = directory;
|
||||
File[] f = d.listFiles(new FilenameFilter() {
|
||||
public boolean accept(File dir, String n) {
|
||||
return dir.equals(d) && n.startsWith(filePrefix);
|
||||
}
|
||||
});
|
||||
for (int i = 0; i < f.length; i++) {
|
||||
files.add(f[i]);
|
||||
}
|
||||
}
|
||||
|
||||
for (File file : files) {
|
||||
try {
|
||||
String n = file.getName();
|
||||
String numStr = n.substring(filePrefix.length(), n.length());
|
||||
int num = Integer.parseInt(numStr);
|
||||
DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
|
||||
fileMap.put(dataFile.getDataFileId(), dataFile);
|
||||
storeSize.addAndGet(dataFile.getLength());
|
||||
} catch (NumberFormatException e) {
|
||||
// Ignore file that do not match the pattern.
|
||||
}
|
||||
}
|
||||
|
||||
// Sort the list so that we can link the DataFiles together in the
|
||||
// right order.
|
||||
List<DataFile> dataFiles = new ArrayList<DataFile>(fileMap.values());
|
||||
Collections.sort(dataFiles);
|
||||
currentWriteFile = null;
|
||||
for (DataFile df : dataFiles) {
|
||||
if (currentWriteFile != null) {
|
||||
currentWriteFile.linkAfter(df);
|
||||
}
|
||||
currentWriteFile = df;
|
||||
fileByFileMap.put(df.getFile(), df);
|
||||
}
|
||||
|
||||
// Need to check the current Write File to see if there was a partial
|
||||
// write to it.
|
||||
if (currentWriteFile != null) {
|
||||
|
||||
// See if the lastSyncedLocation is valid..
|
||||
Location l = lastAppendLocation.get();
|
||||
if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
|
||||
l = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void close() throws IOException {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
accessorPool.close();
|
||||
fileMap.clear();
|
||||
fileByFileMap.clear();
|
||||
started = false;
|
||||
}
|
||||
|
||||
|
||||
public Location getFirstLocation() throws IllegalStateException, IOException {
|
||||
if( currentWriteFile == null ) {
|
||||
return null;
|
||||
}
|
||||
|
||||
DataFile first = (DataFile)currentWriteFile.getHeadNode();
|
||||
Location cur = new Location();
|
||||
cur.setDataFileId(first.getDataFileId());
|
||||
cur.setOffset(0);
|
||||
cur.setSize(0);
|
||||
return getNextLocation(cur);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean delete() throws IOException {
|
||||
throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.async;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
|
||||
/**
|
||||
* Allows you to open a data file in read only mode. Useful when working with
|
||||
* archived data files.
|
||||
*/
|
||||
public class ReadOnlyDataFile extends DataFile {
|
||||
|
||||
ReadOnlyDataFile(File file, int number, int preferedSize) {
|
||||
super(file, number, preferedSize);
|
||||
}
|
||||
|
||||
|
||||
public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
|
||||
RandomAccessFile rc = new RandomAccessFile(file, "r");
|
||||
// When we start to write files size them up so that the OS has a chance
|
||||
// to allocate the file contigously.
|
||||
if (appender) {
|
||||
if (length < preferedSize) {
|
||||
rc.setLength(preferedSize);
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
public void closeRandomAccessFile(RandomAccessFile file) throws IOException {
|
||||
file.close();
|
||||
}
|
||||
|
||||
public synchronized boolean delete() throws IOException {
|
||||
throw new RuntimeException("Not valid on a read only file.");
|
||||
}
|
||||
|
||||
public synchronized void move(File targetDirectory) throws IOException{
|
||||
throw new RuntimeException("Not valid on a read only file.");
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue