mirror of https://github.com/apache/activemq.git
Add files that were part of my .gitignore.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1406770 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9c39f4e40a
commit
4dcf75f482
|
@ -0,0 +1,124 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.data;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
/**
|
||||
* DataFile
|
||||
*
|
||||
*
|
||||
*/
|
||||
class DataFile {
|
||||
|
||||
private final File file;
|
||||
private final Integer number;
|
||||
private int referenceCount;
|
||||
private RandomAccessFile randomAcessFile;
|
||||
private Object writerData;
|
||||
private long length;
|
||||
private boolean dirty;
|
||||
|
||||
DataFile(File file, int number) {
|
||||
this.file = file;
|
||||
this.number = Integer.valueOf(number);
|
||||
length = file.exists() ? file.length() : 0;
|
||||
}
|
||||
|
||||
Integer getNumber() {
|
||||
return number;
|
||||
}
|
||||
|
||||
synchronized RandomAccessFile getRandomAccessFile() throws FileNotFoundException {
|
||||
if (randomAcessFile == null) {
|
||||
randomAcessFile = new RandomAccessFile(file, "rw");
|
||||
}
|
||||
return randomAcessFile;
|
||||
}
|
||||
|
||||
synchronized long getLength() {
|
||||
return length;
|
||||
}
|
||||
|
||||
synchronized void incrementLength(int size) {
|
||||
length += size;
|
||||
}
|
||||
|
||||
synchronized void purge() throws IOException {
|
||||
if (randomAcessFile != null) {
|
||||
randomAcessFile.close();
|
||||
randomAcessFile = null;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized boolean delete() throws IOException {
|
||||
purge();
|
||||
return file.delete();
|
||||
}
|
||||
|
||||
synchronized void close() throws IOException {
|
||||
if (randomAcessFile != null) {
|
||||
randomAcessFile.close();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized int increment() {
|
||||
return ++referenceCount;
|
||||
}
|
||||
|
||||
synchronized int decrement() {
|
||||
return --referenceCount;
|
||||
}
|
||||
|
||||
synchronized boolean isUnused() {
|
||||
return referenceCount <= 0;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
String result = file.getName() + " number = " + number + " , length = " + length + " refCount = " + referenceCount;
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Opaque data that a DataFileWriter may want to associate with the
|
||||
* DataFile.
|
||||
*/
|
||||
public synchronized Object getWriterData() {
|
||||
return writerData;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param writerData - Opaque data that a DataFileWriter may want to
|
||||
* associate with the DataFile.
|
||||
*/
|
||||
public synchronized void setWriterData(Object writerData) {
|
||||
this.writerData = writerData;
|
||||
dirty = true;
|
||||
}
|
||||
|
||||
public synchronized boolean isDirty() {
|
||||
return dirty;
|
||||
}
|
||||
|
||||
public synchronized void setDirty(boolean value) {
|
||||
this.dirty = value;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.data;
|
||||
|
||||
import org.apache.activemq.kaha.StoreLocation;
|
||||
|
||||
/**
|
||||
* A a wrapper for a data in the store
|
||||
*
|
||||
*
|
||||
*/
|
||||
public final class DataItem implements Item, StoreLocation {
|
||||
|
||||
private int file = (int)POSITION_NOT_SET;
|
||||
private long offset = POSITION_NOT_SET;
|
||||
private int size;
|
||||
|
||||
public DataItem() {
|
||||
}
|
||||
|
||||
DataItem(DataItem item) {
|
||||
this.file = item.file;
|
||||
this.offset = item.offset;
|
||||
this.size = item.size;
|
||||
}
|
||||
|
||||
boolean isValid() {
|
||||
return file != POSITION_NOT_SET;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return
|
||||
* @see org.apache.activemq.kaha.StoreLocation#getSize()
|
||||
*/
|
||||
public int getSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param size The size to set.
|
||||
*/
|
||||
public void setSize(int size) {
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return
|
||||
* @see org.apache.activemq.kaha.StoreLocation#getOffset()
|
||||
*/
|
||||
public long getOffset() {
|
||||
return offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param offset The offset to set.
|
||||
*/
|
||||
public void setOffset(long offset) {
|
||||
this.offset = offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return
|
||||
* @see org.apache.activemq.kaha.StoreLocation#getFile()
|
||||
*/
|
||||
public int getFile() {
|
||||
return file;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param file The file to set.
|
||||
*/
|
||||
public void setFile(int file) {
|
||||
this.file = file;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a pretty print
|
||||
*/
|
||||
public String toString() {
|
||||
String result = "offset = " + offset + ", file = " + file + ", size = " + size;
|
||||
return result;
|
||||
}
|
||||
|
||||
public DataItem copy() {
|
||||
return new DataItem(this);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,408 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.data;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.kaha.Marshaller;
|
||||
import org.apache.activemq.kaha.StoreLocation;
|
||||
import org.apache.activemq.kaha.impl.DataManager;
|
||||
import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Manages DataFiles
|
||||
*
|
||||
*
|
||||
*/
|
||||
public final class DataManagerImpl implements DataManager {
|
||||
|
||||
public static final int ITEM_HEAD_SIZE = 5; // type + length
|
||||
public static final byte DATA_ITEM_TYPE = 1;
|
||||
public static final byte REDO_ITEM_TYPE = 2;
|
||||
public static final long MAX_FILE_LENGTH = 1024 * 1024 * 32;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DataManagerImpl.class);
|
||||
private static final String NAME_PREFIX = "data-";
|
||||
|
||||
private final File directory;
|
||||
private final String name;
|
||||
private SyncDataFileReader reader;
|
||||
private SyncDataFileWriter writer;
|
||||
private DataFile currentWriteFile;
|
||||
private long maxFileLength = MAX_FILE_LENGTH;
|
||||
private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
|
||||
private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
|
||||
private String dataFilePrefix;
|
||||
private final AtomicLong storeSize;
|
||||
|
||||
public DataManagerImpl(File dir, final String name,AtomicLong storeSize) {
|
||||
this.directory = dir;
|
||||
this.name = name;
|
||||
this.storeSize=storeSize;
|
||||
|
||||
dataFilePrefix = IOHelper.toFileSystemSafeName(NAME_PREFIX + name + "-");
|
||||
// build up list of current dataFiles
|
||||
File[] files = dir.listFiles(new FilenameFilter() {
|
||||
public boolean accept(File dir, String n) {
|
||||
return dir.equals(directory) && n.startsWith(dataFilePrefix);
|
||||
}
|
||||
});
|
||||
if (files != null) {
|
||||
for (int i = 0; i < files.length; i++) {
|
||||
File file = files[i];
|
||||
String n = file.getName();
|
||||
String numStr = n.substring(dataFilePrefix.length(), n.length());
|
||||
int num = Integer.parseInt(numStr);
|
||||
DataFile dataFile = new DataFile(file, num);
|
||||
storeSize.addAndGet(dataFile.getLength());
|
||||
fileMap.put(dataFile.getNumber(), dataFile);
|
||||
if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) {
|
||||
currentWriteFile = dataFile;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private DataFile createAndAddDataFile(int num) {
|
||||
String fileName = dataFilePrefix + num;
|
||||
File file = new File(directory, fileName);
|
||||
DataFile result = new DataFile(file, num);
|
||||
fileMap.put(result.getNumber(), result);
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#getName()
|
||||
*/
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
synchronized DataFile findSpaceForData(DataItem item) throws IOException {
|
||||
if (currentWriteFile == null || ((currentWriteFile.getLength() + item.getSize()) > maxFileLength)) {
|
||||
int nextNum = currentWriteFile != null ? currentWriteFile.getNumber().intValue() + 1 : 1;
|
||||
if (currentWriteFile != null && currentWriteFile.isUnused()) {
|
||||
removeDataFile(currentWriteFile);
|
||||
}
|
||||
currentWriteFile = createAndAddDataFile(nextNum);
|
||||
}
|
||||
item.setOffset(currentWriteFile.getLength());
|
||||
item.setFile(currentWriteFile.getNumber().intValue());
|
||||
int len = item.getSize() + ITEM_HEAD_SIZE;
|
||||
currentWriteFile.incrementLength(len);
|
||||
storeSize.addAndGet(len);
|
||||
return currentWriteFile;
|
||||
}
|
||||
|
||||
DataFile getDataFile(StoreLocation item) throws IOException {
|
||||
Integer key = Integer.valueOf(item.getFile());
|
||||
DataFile dataFile = fileMap.get(key);
|
||||
if (dataFile == null) {
|
||||
LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
|
||||
throw new IOException("Could not locate data file " + NAME_PREFIX + name + "-" + item.getFile());
|
||||
}
|
||||
return dataFile;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller,
|
||||
* org.apache.activemq.kaha.StoreLocation)
|
||||
*/
|
||||
public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
|
||||
return getReader().readItem(marshaller, item);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller,
|
||||
* java.lang.Object)
|
||||
*/
|
||||
public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
|
||||
return getWriter().storeItem(marshaller, payload, DATA_ITEM_TYPE);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object)
|
||||
*/
|
||||
public synchronized StoreLocation storeRedoItem(Object payload) throws IOException {
|
||||
return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation,
|
||||
* org.apache.activemq.kaha.Marshaller, java.lang.Object)
|
||||
*/
|
||||
public synchronized void updateItem(StoreLocation location, Marshaller marshaller, Object payload)
|
||||
throws IOException {
|
||||
getWriter().updateItem((DataItem)location, marshaller, payload, DATA_ITEM_TYPE);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener)
|
||||
*/
|
||||
public synchronized void recoverRedoItems(RedoListener listener) throws IOException {
|
||||
|
||||
// Nothing to recover if there is no current file.
|
||||
if (currentWriteFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
DataItem item = new DataItem();
|
||||
item.setFile(currentWriteFile.getNumber().intValue());
|
||||
item.setOffset(0);
|
||||
while (true) {
|
||||
byte type;
|
||||
try {
|
||||
type = getReader().readDataItemSize(item);
|
||||
} catch (IOException ignore) {
|
||||
LOG.trace("End of data file reached at (header was invalid): " + item);
|
||||
return;
|
||||
}
|
||||
if (type == REDO_ITEM_TYPE) {
|
||||
// Un-marshal the redo item
|
||||
Object object;
|
||||
try {
|
||||
object = readItem(redoMarshaller, item);
|
||||
} catch (IOException e1) {
|
||||
LOG.trace("End of data file reached at (payload was invalid): " + item);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
|
||||
listener.onRedoItem(item, object);
|
||||
// in case the listener is holding on to item references,
|
||||
// copy it
|
||||
// so we don't change it behind the listener's back.
|
||||
item = item.copy();
|
||||
|
||||
} catch (Exception e) {
|
||||
throw IOExceptionSupport.create("Recovery handler failed: " + e, e);
|
||||
}
|
||||
}
|
||||
// Move to the next item.
|
||||
item.setOffset(item.getOffset() + ITEM_HEAD_SIZE + item.getSize());
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#close()
|
||||
*/
|
||||
public synchronized void close() throws IOException {
|
||||
getWriter().close();
|
||||
for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
|
||||
DataFile dataFile = i.next();
|
||||
getWriter().force(dataFile);
|
||||
dataFile.close();
|
||||
}
|
||||
fileMap.clear();
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#force()
|
||||
*/
|
||||
public synchronized void force() throws IOException {
|
||||
for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
|
||||
DataFile dataFile = i.next();
|
||||
getWriter().force(dataFile);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#delete()
|
||||
*/
|
||||
public synchronized boolean delete() throws IOException {
|
||||
boolean result = true;
|
||||
for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
|
||||
DataFile dataFile = i.next();
|
||||
storeSize.addAndGet(-dataFile.getLength());
|
||||
result &= dataFile.delete();
|
||||
}
|
||||
fileMap.clear();
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int)
|
||||
*/
|
||||
public synchronized void addInterestInFile(int file) throws IOException {
|
||||
if (file >= 0) {
|
||||
Integer key = Integer.valueOf(file);
|
||||
DataFile dataFile = fileMap.get(key);
|
||||
if (dataFile == null) {
|
||||
dataFile = createAndAddDataFile(file);
|
||||
}
|
||||
addInterestInFile(dataFile);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void addInterestInFile(DataFile dataFile) {
|
||||
if (dataFile != null) {
|
||||
dataFile.increment();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int)
|
||||
*/
|
||||
public synchronized void removeInterestInFile(int file) throws IOException {
|
||||
if (file >= 0) {
|
||||
Integer key = Integer.valueOf(file);
|
||||
DataFile dataFile = fileMap.get(key);
|
||||
removeInterestInFile(dataFile);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
|
||||
if (dataFile != null) {
|
||||
|
||||
if (dataFile.decrement() <= 0) {
|
||||
if (dataFile != currentWriteFile) {
|
||||
removeDataFile(dataFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles()
|
||||
*/
|
||||
public synchronized void consolidateDataFiles() throws IOException {
|
||||
List<DataFile> purgeList = new ArrayList<DataFile>();
|
||||
for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
|
||||
DataFile dataFile = i.next();
|
||||
if (dataFile.isUnused() && dataFile != currentWriteFile) {
|
||||
purgeList.add(dataFile);
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < purgeList.size(); i++) {
|
||||
DataFile dataFile = purgeList.get(i);
|
||||
removeDataFile(dataFile);
|
||||
}
|
||||
}
|
||||
|
||||
private void removeDataFile(DataFile dataFile) throws IOException {
|
||||
fileMap.remove(dataFile.getNumber());
|
||||
if (writer != null) {
|
||||
writer.force(dataFile);
|
||||
}
|
||||
storeSize.addAndGet(-dataFile.getLength());
|
||||
boolean result = dataFile.delete();
|
||||
LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller()
|
||||
*/
|
||||
public Marshaller getRedoMarshaller() {
|
||||
return redoMarshaller;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller)
|
||||
*/
|
||||
public void setRedoMarshaller(Marshaller redoMarshaller) {
|
||||
this.redoMarshaller = redoMarshaller;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maxFileLength
|
||||
*/
|
||||
public long getMaxFileLength() {
|
||||
return maxFileLength;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxFileLength the maxFileLength to set
|
||||
*/
|
||||
public void setMaxFileLength(long maxFileLength) {
|
||||
this.maxFileLength = maxFileLength;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "DataManager:(" + NAME_PREFIX + name + ")";
|
||||
}
|
||||
|
||||
public synchronized SyncDataFileReader getReader() {
|
||||
if (reader == null) {
|
||||
reader = createReader();
|
||||
}
|
||||
return reader;
|
||||
}
|
||||
|
||||
protected synchronized SyncDataFileReader createReader() {
|
||||
return new SyncDataFileReader(this);
|
||||
}
|
||||
|
||||
public synchronized void setReader(SyncDataFileReader reader) {
|
||||
this.reader = reader;
|
||||
}
|
||||
|
||||
public synchronized SyncDataFileWriter getWriter() {
|
||||
if (writer == null) {
|
||||
writer = createWriter();
|
||||
}
|
||||
return writer;
|
||||
}
|
||||
|
||||
private SyncDataFileWriter createWriter() {
|
||||
return new SyncDataFileWriter(this);
|
||||
}
|
||||
|
||||
public synchronized void setWriter(SyncDataFileWriter writer) {
|
||||
this.writer = writer;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.data;
|
||||
|
||||
/**
|
||||
* A a wrapper for a data in the store
|
||||
*
|
||||
*
|
||||
*/
|
||||
public interface Item {
|
||||
long POSITION_NOT_SET = -1;
|
||||
short MAGIC = 31317;
|
||||
int ACTIVE = 22;
|
||||
int FREE = 33;
|
||||
int LOCATION_SIZE = 24;
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.data;
|
||||
|
||||
import org.apache.activemq.kaha.StoreLocation;
|
||||
|
||||
|
||||
public interface RedoListener {
|
||||
|
||||
void onRedoItem(StoreLocation item, Object object) throws Exception;
|
||||
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.data;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import org.apache.activemq.kaha.Marshaller;
|
||||
import org.apache.activemq.kaha.StoreLocation;
|
||||
import org.apache.activemq.util.DataByteArrayInputStream;
|
||||
|
||||
/**
|
||||
* Optimized Store reader
|
||||
*
|
||||
*
|
||||
*/
|
||||
public final class SyncDataFileReader {
|
||||
|
||||
private DataManagerImpl dataManager;
|
||||
private DataByteArrayInputStream dataIn;
|
||||
|
||||
/**
|
||||
* Construct a Store reader
|
||||
*
|
||||
* @param fileId
|
||||
*/
|
||||
SyncDataFileReader(DataManagerImpl fileManager) {
|
||||
this.dataManager = fileManager;
|
||||
this.dataIn = new DataByteArrayInputStream();
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
|
||||
*/
|
||||
public synchronized byte readDataItemSize(DataItem item) throws IOException {
|
||||
RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
|
||||
file.seek(item.getOffset()); // jump to the size field
|
||||
byte rc = file.readByte();
|
||||
item.setSize(file.readInt());
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller,
|
||||
* org.apache.activemq.kaha.StoreLocation)
|
||||
*/
|
||||
public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
|
||||
RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
|
||||
|
||||
// TODO: we could reuse the buffer in dataIn if it's big enough to avoid
|
||||
// allocating byte[] arrays on every readItem.
|
||||
byte[] data = new byte[item.getSize()];
|
||||
file.seek(item.getOffset() + DataManagerImpl.ITEM_HEAD_SIZE);
|
||||
file.readFully(data);
|
||||
dataIn.restart(data);
|
||||
return marshaller.readPayload(dataIn);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.data;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
import org.apache.activemq.kaha.Marshaller;
|
||||
import org.apache.activemq.util.DataByteArrayOutputStream;
|
||||
|
||||
/**
|
||||
* Optimized Store writer. Synchronously marshalls and writes to the data file.
|
||||
* Simple but may introduce a bit of contention when put under load.
|
||||
*
|
||||
*
|
||||
*/
|
||||
public final class SyncDataFileWriter {
|
||||
|
||||
private DataByteArrayOutputStream buffer;
|
||||
private DataManagerImpl dataManager;
|
||||
|
||||
/**
|
||||
* Construct a Store writer
|
||||
*
|
||||
* @param fileId
|
||||
*/
|
||||
SyncDataFileWriter(DataManagerImpl fileManager) {
|
||||
this.dataManager = fileManager;
|
||||
this.buffer = new DataByteArrayOutputStream();
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller,
|
||||
* java.lang.Object, byte)
|
||||
*/
|
||||
public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type)
|
||||
throws IOException {
|
||||
|
||||
// Write the packet our internal buffer.
|
||||
buffer.reset();
|
||||
buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
|
||||
marshaller.writePayload(payload, buffer);
|
||||
int size = buffer.size();
|
||||
int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
|
||||
buffer.reset();
|
||||
buffer.writeByte(type);
|
||||
buffer.writeInt(payloadSize);
|
||||
|
||||
// Find the position where this item will land at.
|
||||
DataItem item = new DataItem();
|
||||
item.setSize(payloadSize);
|
||||
DataFile dataFile = dataManager.findSpaceForData(item);
|
||||
|
||||
// Now splat the buffer to the file.
|
||||
dataFile.getRandomAccessFile().seek(item.getOffset());
|
||||
dataFile.getRandomAccessFile().write(buffer.getData(), 0, size);
|
||||
dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
|
||||
|
||||
dataManager.addInterestInFile(dataFile);
|
||||
return item;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation,
|
||||
* org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
|
||||
*/
|
||||
public synchronized void updateItem(DataItem item, Marshaller marshaller, Object payload, byte type)
|
||||
throws IOException {
|
||||
// Write the packet our internal buffer.
|
||||
buffer.reset();
|
||||
buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
|
||||
marshaller.writePayload(payload, buffer);
|
||||
int size = buffer.size();
|
||||
int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
|
||||
buffer.reset();
|
||||
buffer.writeByte(type);
|
||||
buffer.writeInt(payloadSize);
|
||||
item.setSize(payloadSize);
|
||||
DataFile dataFile = dataManager.getDataFile(item);
|
||||
RandomAccessFile file = dataFile.getRandomAccessFile();
|
||||
file.seek(item.getOffset());
|
||||
file.write(buffer.getData(), 0, size);
|
||||
dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
|
||||
}
|
||||
|
||||
public synchronized void force(DataFile dataFile) throws IOException {
|
||||
// If our dirty marker was set.. then we need to sync
|
||||
if (dataFile.getWriterData() != null && dataFile.isDirty()) {
|
||||
dataFile.getRandomAccessFile().getFD().sync();
|
||||
dataFile.setWriterData(null);
|
||||
dataFile.setDirty(false);
|
||||
}
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue