Added a new org.apache.activemq.kaha.impl.asyc package that holds data manager/journal that implements both the Kaha DataManager and ActiveIO Journal interfaces.

- Initial bench marks show it to be as fast or faster than the default ActiveIO Journal.
- The bigest differentiator is that this implementation of the journal was built to also provide fast reads.
- The DataManager interface was extracted and now the KahaStore can switch between the original DataManager implementation and the new implementation in the kaha.impl.async packagge.
- Simplified the original implementation by removing the AsyncDataWriters stuff since this is largely what the new package is based on.



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@479089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-11-25 06:00:56 +00:00
parent 3bf0245e76
commit 118c806907
34 changed files with 2907 additions and 584 deletions

View File

@ -48,6 +48,12 @@
<artifactId>activeio-core</artifactId>
<optional>false</optional>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<optional>false</optional>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>

View File

@ -0,0 +1,42 @@
package org.apache.activemq.kaha.impl;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.data.RedoListener;
public interface DataManager {
String getName();
Object readItem(Marshaller marshaller, StoreLocation item)
throws IOException;
StoreLocation storeDataItem(Marshaller marshaller, Object payload)
throws IOException;
StoreLocation storeRedoItem(Object payload) throws IOException;
void updateItem(StoreLocation location, Marshaller marshaller,
Object payload) throws IOException;
void recoverRedoItems(RedoListener listener) throws IOException;
void close() throws IOException;
void force() throws IOException;
boolean delete() throws IOException;
void addInterestInFile(int file) throws IOException;
void removeInterestInFile(int file) throws IOException;
void consolidateDataFiles() throws IOException;
Marshaller getRedoMarshaller();
void setRedoMarshaller(Marshaller redoMarshaller);
}

View File

@ -27,7 +27,6 @@ import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.container.ContainerId;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;

View File

@ -33,11 +33,13 @@ import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.DataManagerFacade;
import org.apache.activemq.kaha.impl.container.BaseContainerImpl;
import org.apache.activemq.kaha.impl.container.ContainerId;
import org.apache.activemq.kaha.impl.container.ListContainerImpl;
import org.apache.activemq.kaha.impl.container.MapContainerImpl;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.data.DataManagerImpl;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.data.RedoListener;
import org.apache.activemq.kaha.impl.index.IndexItem;
@ -73,8 +75,8 @@ public class KahaStore implements Store{
private String mode;
private boolean initialized;
private boolean logIndexChanges=false;
private boolean useAsyncWriter=false;
private long maxDataFileLength=DataManager.MAX_FILE_LENGTH;
private boolean useAsyncDataManager=false;
private long maxDataFileLength=1024*1024*32;
private FileLock lock;
private String indexType=IndexTypes.DISK_INDEX;
@ -319,10 +321,21 @@ public class KahaStore implements Store{
public synchronized DataManager getDataManager(String name) throws IOException{
DataManager dm=(DataManager)dataManagers.get(name);
if(dm==null){
dm=new DataManager(directory,name);
dm.setMaxFileLength(maxDataFileLength);
dm.setUseAsyncWriter(isUseAsyncWriter());
recover(dm);
if( isUseAsyncDataManager() ) {
AsyncDataManager t=new AsyncDataManager();
t.setDirectory(directory);
t.setFilePrefix("data-"+name+"-");
t.setMaxFileLength((int) maxDataFileLength);
t.start();
dm=new DataManagerFacade(t, name);
} else {
DataManagerImpl t=new DataManagerImpl(directory,name);
t.setMaxFileLength(maxDataFileLength);
dm=t;
}
if( logIndexChanges ) {
recover(dm);
}
dataManagers.put(name,dm);
}
return dm;
@ -339,7 +352,6 @@ public class KahaStore implements Store{
private void recover(final DataManager dm) throws IOException{
dm.recoverRedoItems(new RedoListener(){
public void onRedoItem(StoreLocation item,Object o) throws Exception{
RedoStoreIndexItem redo=(RedoStoreIndexItem)o;
// IndexManager im = getIndexManager(dm, redo.getIndexName());
@ -531,12 +543,12 @@ public class KahaStore implements Store{
}
}
public synchronized boolean isUseAsyncWriter() {
return useAsyncWriter;
public synchronized boolean isUseAsyncDataManager() {
return useAsyncDataManager;
}
public synchronized void setUseAsyncWriter(boolean useAsyncWriter) {
this.useAsyncWriter = useAsyncWriter;
public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
this.useAsyncDataManager = useAsyncWriter;
}

View File

@ -0,0 +1,481 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
import org.apache.activemq.util.ByteSequence;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Manages DataFiles
*
* @version $Revision: 1.1.1.1 $
*/
public final class AsyncDataManager {
private static final Log log=LogFactory.getLog(AsyncDataManager.class);
public static int CONTROL_RECORD_MAX_LENGTH=1024;
public static final int ITEM_HEAD_RESERVED_SPACE=21;
// ITEM_HEAD_SPACE = length + type+ reserved space + SOR
public static final int ITEM_HEAD_SPACE=4+1+ITEM_HEAD_RESERVED_SPACE+3;
public static final int ITEM_HEAD_OFFSET_TO_SOR=ITEM_HEAD_SPACE-3;
public static final int ITEM_FOOT_SPACE=3; // EOR
public static final int ITEM_HEAD_FOOT_SPACE=ITEM_HEAD_SPACE+ITEM_FOOT_SPACE;
public static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; //
public static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; //
public static final byte DATA_ITEM_TYPE=1;
public static final byte REDO_ITEM_TYPE=2;
public static String DEFAULT_DIRECTORY="data";
public static String DEFAULT_FILE_PREFIX="data-";
public static int DEFAULT_MAX_FILE_LENGTH=1024*1024*32;
private File directory = new File(DEFAULT_DIRECTORY);
private String filePrefix=DEFAULT_FILE_PREFIX;
private int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH-1024*512;
private DataFileAppender appender;
private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
private Map<Integer,DataFile> fileMap=new HashMap<Integer,DataFile>();
private DataFile currentWriteFile;
ControlFile controlFile;
private Location mark;
private final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
boolean started = false;
boolean useNio = true;
protected final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
@SuppressWarnings("unchecked")
public synchronized void start() throws IOException {
if( started ) {
return;
}
started=true;
directory.mkdirs();
controlFile = new ControlFile(new File(directory, filePrefix+"control"), CONTROL_RECORD_MAX_LENGTH);
controlFile.lock();
ByteSequence sequence = controlFile.load();
if( sequence != null && sequence.getLength()>0 ) {
unmarshallState(sequence);
}
if( useNio) {
appender = new NIODataFileAppender(this);
} else {
appender = new DataFileAppender(this);
}
File[] files=directory.listFiles(new FilenameFilter(){
public boolean accept(File dir,String n){
return dir.equals(dir)&&n.startsWith(filePrefix);
}
});
if(files!=null){
for(int i=0;i<files.length;i++){
try {
File file=files[i];
String n=file.getName();
String numStr=n.substring(filePrefix.length(),n.length());
int num=Integer.parseInt(numStr);
DataFile dataFile=new DataFile(file,num, preferedFileLength);
fileMap.put(dataFile.getDataFileId(),dataFile);
} catch (NumberFormatException e) {
// Ignore file that do not match the patern.
}
}
// Sort the list so that we can link the DataFiles together in the right order.
ArrayList<DataFile> l = new ArrayList<DataFile>(fileMap.values());
Collections.sort(l);
currentWriteFile=null;
for (DataFile df : l) {
if( currentWriteFile!=null ) {
currentWriteFile.linkAfter(df);
}
currentWriteFile=df;
}
}
// Need to check the current Write File to see if there was a partial write to it.
if( currentWriteFile!=null ) {
// See if the lastSyncedLocation is valid..
Location l = lastAppendLocation.get();
if( l!=null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue() ) {
l=null;
}
// If we know the last location that was ok.. then we can skip lots of checking
l = recoveryCheck(currentWriteFile, l);
lastAppendLocation.set(l);
}
storeState(false);
}
private Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
if( location == null ) {
location = new Location();
location.setDataFileId(dataFile.getDataFileId());
location.setOffset(0);
}
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
reader.readLocationDetails(location);
while( reader.readLocationDetailsAndValidate(location) ) {
location.setOffset(location.getOffset()+location.getSize());
}
} finally {
accessorPool.closeDataFileAccessor(reader);
}
dataFile.setLength(location.getOffset());
return location;
}
private void unmarshallState(ByteSequence sequence) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
DataInputStream dis = new DataInputStream(bais);
if( dis.readBoolean() ) {
mark = new Location();
mark.readExternal(dis);
} else {
mark = null;
}
if( dis.readBoolean() ) {
Location l = new Location();
l.readExternal(dis);
lastAppendLocation.set(l);
} else {
lastAppendLocation.set(null);
}
}
private ByteSequence marshallState() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
if( mark!=null ) {
dos.writeBoolean(true);
mark.writeExternal(dos);
} else {
dos.writeBoolean(false);
}
Location l = lastAppendLocation.get();
if( l!=null ) {
dos.writeBoolean(true);
l.writeExternal(dos);
} else {
dos.writeBoolean(false);
}
byte[] bs = baos.toByteArray();
return new ByteSequence(bs,0,bs.length);
}
synchronized DataFile allocateLocation(Location location) throws IOException{
if(currentWriteFile==null||((currentWriteFile.getLength()+location.getSize())>maxFileLength)){
int nextNum=currentWriteFile!=null?currentWriteFile.getDataFileId().intValue()+1:1;
String fileName=filePrefix+nextNum;
DataFile nextWriteFile=new DataFile(new File(directory,fileName),nextNum, preferedFileLength);
fileMap.put(nextWriteFile.getDataFileId(),nextWriteFile);
if( currentWriteFile!=null ) {
currentWriteFile.linkAfter(nextWriteFile);
if(currentWriteFile.isUnused()){
removeDataFile(currentWriteFile);
}
}
currentWriteFile=nextWriteFile;
}
location.setOffset(currentWriteFile.getLength());
location.setDataFileId(currentWriteFile.getDataFileId().intValue());
currentWriteFile.incrementLength(location.getSize());
currentWriteFile.increment();
return currentWriteFile;
}
DataFile getDataFile(Location item) throws IOException{
Integer key=new Integer(item.getDataFileId());
DataFile dataFile=(DataFile) fileMap.get(key);
if(dataFile==null){
log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
throw new IOException("Could not locate data file "+filePrefix+"-"+item.getDataFileId());
}
return dataFile;
}
private DataFile getNextDataFile(DataFile dataFile) {
return (DataFile) dataFile.getNext();
}
public synchronized void close() throws IOException{
accessorPool.close();
storeState(false);
appender.close();
fileMap.clear();
controlFile.unlock();
controlFile.dispose();
}
public synchronized boolean delete() throws IOException{
boolean result=true;
for(Iterator i=fileMap.values().iterator();i.hasNext();){
DataFile dataFile=(DataFile) i.next();
result&=dataFile.delete();
}
fileMap.clear();
return result;
}
public synchronized void addInterestInFile(int file) throws IOException{
if(file>=0){
Integer key=new Integer(file);
DataFile dataFile=(DataFile) fileMap.get(key);
if(dataFile==null){
throw new IOException("That data file does not exist");
}
addInterestInFile(dataFile);
}
}
synchronized void addInterestInFile(DataFile dataFile){
if(dataFile!=null){
dataFile.increment();
}
}
public synchronized void removeInterestInFile(int file) throws IOException{
if(file>=0){
Integer key=new Integer(file);
DataFile dataFile=(DataFile) fileMap.get(key);
removeInterestInFile(dataFile);
}
}
synchronized void removeInterestInFile(DataFile dataFile) throws IOException{
if(dataFile!=null){
if(dataFile.decrement()<=0){
if(dataFile!=currentWriteFile){
removeDataFile(dataFile);
}
}
}
}
public synchronized void consolidateDataFiles() throws IOException{
List<DataFile> purgeList=new ArrayList<DataFile>();
for (DataFile dataFile : fileMap.values()) {
if(dataFile.isUnused() && dataFile != currentWriteFile){
purgeList.add(dataFile);
}
}
for (DataFile dataFile : purgeList) {
removeDataFile(dataFile);
}
}
private void removeDataFile(DataFile dataFile) throws IOException{
fileMap.remove(dataFile.getDataFileId());
dataFile.unlink();
boolean result=dataFile.delete();
log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
}
/**
* @return the maxFileLength
*/
public int getMaxFileLength(){
return maxFileLength;
}
/**
* @param maxFileLength the maxFileLength to set
*/
public void setMaxFileLength(int maxFileLength){
this.maxFileLength=maxFileLength;
}
public String toString(){
return "DataManager:("+filePrefix+")";
}
public synchronized Location getMark() throws IllegalStateException {
return mark;
}
public Location getNextLocation(Location location) throws IOException, IllegalStateException {
Location cur = null;
while( true ) {
if( cur == null ) {
if( location == null ) {
DataFile head = (DataFile) currentWriteFile.getHeadNode();
cur = new Location();
cur.setDataFileId(head.getDataFileId());
cur.setOffset(0);
// DataFileAccessor reader = accessorPool.openDataFileAccessor(head);
// try {
// if( !reader.readLocationDetailsAndValidate(cur) ) {
// return null;
// }
// } finally {
// accessorPool.closeDataFileAccessor(reader);
// }
} else {
// Set to the next offset..
cur = new Location(location);
cur.setOffset(cur.getOffset()+cur.getSize());
}
} else {
cur.setOffset(cur.getOffset()+cur.getSize());
}
DataFile dataFile = getDataFile(cur);
// Did it go into the next file??
if( dataFile.getLength() <= cur.getOffset() ) {
dataFile = getNextDataFile(dataFile);
if( dataFile == null ) {
return null;
} else {
cur.setDataFileId(dataFile.getDataFileId().intValue());
cur.setOffset(0);
}
}
// Load in location size and type.
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
reader.readLocationDetails(cur);
} finally {
accessorPool.closeDataFileAccessor(reader);
}
if( cur.getType() == 0 ) {
return null;
} else if( cur.getType() > 0 ) {
// Only return user records.
return cur;
}
}
}
public ByteSequence read(Location location) throws IOException, IllegalStateException {
DataFile dataFile = getDataFile(location);
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
ByteSequence rc=null;
try {
rc = reader.readRecord(location);
} finally {
accessorPool.closeDataFileAccessor(reader);
}
return rc;
}
public synchronized void setMark(Location location, boolean sync) throws IOException, IllegalStateException {
mark = location;
storeState(sync);
}
private void storeState(boolean sync) throws IOException {
ByteSequence state = marshallState();
appender.storeItem(state, Location.MARK_TYPE, sync);
controlFile.store(state, sync);
}
public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
return appender.storeItem(data, Location.USER_TYPE, sync);
}
public Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
return appender.storeItem(data, type, sync);
}
public void update(Location location, ByteSequence data, boolean sync) throws IOException {
DataFile dataFile = getDataFile(location);
DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
try {
updater.updateRecord(location, data, sync);
} finally {
accessorPool.closeDataFileAccessor(updater);
}
}
public File getDirectory() {
return directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
public String getFilePrefix() {
return filePrefix;
}
public void setFilePrefix(String filePrefix) {
this.filePrefix = filePrefix;
}
public ConcurrentHashMap<WriteKey, WriteCommand> getInflightWrites() {
return inflightWrites;
}
public Location getLastAppendLocation() {
return lastAppendLocation.get();
}
public void setLastAppendLocation(Location lastSyncedLocation) {
this.lastAppendLocation.set(lastSyncedLocation);
}
}

View File

@ -0,0 +1,161 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import org.apache.activemq.util.ByteSequence;
/**
* Use to reliably store fixed sized state data. It stores the state in
* record that is versioned and repeated twice in the file so that a failure in the
* middle of the write of the first or second record do not not result in an unknown
* state.
*
* @version $Revision: 1.1 $
*/
final public class ControlFile {
private final static boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false"));
private final File file;
/** The File that holds the control data. */
private final RandomAccessFile randomAccessFile;
private final int maxRecordSize;
private long version=0;
private FileLock lock;
private boolean disposed;
public ControlFile(File file, int recordSize) throws IOException {
this.file = file;
this.maxRecordSize = recordSize+4;
randomAccessFile = new RandomAccessFile(file, "rw");
}
/**
* Locks the control file.
* @throws IOException
*/
public void lock() throws IOException {
if( DISABLE_FILE_LOCK )
return;
if( lock == null ) {
lock = randomAccessFile.getChannel().tryLock();
if (lock == null) {
throw new IOException("Control file '"+file+"' could not be locked.");
}
}
}
/**
* Un locks the control file.
*
* @throws IOException
*/
public void unlock() throws IOException {
if( DISABLE_FILE_LOCK )
return;
if (lock != null) {
lock.release();
lock = null;
}
}
public void dispose() {
if( disposed )
return;
disposed=true;
try {
unlock();
} catch (IOException e) {
}
try {
randomAccessFile.close();
} catch (IOException e) {
}
}
synchronized public ByteSequence load() throws IOException {
long l = randomAccessFile.length();
if( l < maxRecordSize ) {
return null;
}
randomAccessFile.seek(0);
long v1 = randomAccessFile.readLong();
randomAccessFile.seek(maxRecordSize+8);
long v1check = randomAccessFile.readLong();
randomAccessFile.seek(maxRecordSize+16);
long v2 = randomAccessFile.readLong();
randomAccessFile.seek((maxRecordSize*2)+24);
long v2check = randomAccessFile.readLong();
byte[] data=null;
if( v2 == v2check ) {
version = v2;
randomAccessFile.seek(maxRecordSize+24);
int size = randomAccessFile.readInt();
data = new byte[size];
randomAccessFile.readFully(data);
} else if ( v1 == v1check ){
version = v1;
randomAccessFile.seek(maxRecordSize+8);
int size = randomAccessFile.readInt();
data = new byte[size];
randomAccessFile.readFully(data);
} else {
// Bummer.. Both checks are screwed. we don't know
// if any of the two buffer are ok. This should
// only happen is data got corrupted.
throw new IOException("Control data corrupted.");
}
return new ByteSequence(data,0,data.length);
}
public void store(ByteSequence data, boolean sync) throws IOException {
version++;
randomAccessFile.setLength((maxRecordSize*2)+32);
randomAccessFile.seek(0);
// Write the first copy of the control data.
randomAccessFile.writeLong(version);
randomAccessFile.writeInt(data.getLength());
randomAccessFile.write(data.getData());
randomAccessFile.writeLong(version);
// Write the second copy of the control data.
randomAccessFile.writeLong(version);
randomAccessFile.writeInt(data.getLength());
randomAccessFile.write(data.getData());
randomAccessFile.writeLong(version);
if( sync ) {
randomAccessFile.getFD().sync();
}
}
}

View File

@ -0,0 +1,108 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.activemq.util.LinkedNode;
/**
* DataFile
*
* @version $Revision: 1.1.1.1 $
*/
class DataFile extends LinkedNode implements Comparable {
private final File file;
private final Integer dataFileId;
private final int preferedSize;
int length=0;
private int referenceCount;
DataFile(File file, int number, int preferedSize){
this.file=file;
this.preferedSize = preferedSize;
this.dataFileId=new Integer(number);
length=(int)(file.exists()?file.length():0);
}
public Integer getDataFileId(){
return dataFileId;
}
public synchronized int getLength(){
return length;
}
public void setLength(int length) {
this.length=length;
}
public synchronized void incrementLength(int size){
length+=size;
}
public synchronized int increment(){
return ++referenceCount;
}
public synchronized int decrement(){
return --referenceCount;
}
public synchronized boolean isUnused(){
return referenceCount<=0;
}
public synchronized String toString(){
String result = file.getName() + " number = " + dataFileId + " , length = " + length + " refCount = " + referenceCount;
return result;
}
public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
RandomAccessFile rc=new RandomAccessFile(file,"rw");
// When we start to write files size them up so that the OS has a chance
// to allocate the file contigously.
if( appender ){
if( length < preferedSize ) {
rc.setLength(preferedSize);
}
}
return rc;
}
public void closeRandomAccessFile(RandomAccessFile file) throws IOException {
// On close set the file size to the real size.
if( length != file.length() ) {
file.setLength(getLength());
file.close();
}
}
public synchronized boolean delete() throws IOException{
return file.delete();
}
public int compareTo(Object o) {
DataFile df = (DataFile) o;
return dataFileId - df.dataFileId;
}
}

View File

@ -0,0 +1,143 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
import org.apache.activemq.util.ByteSequence;
/**
* Optimized Store reader and updater. Single threaded and synchronous. Use in conjunction
* with the DataFileAccessorPool of concurrent use.
*
* @version $Revision: 1.1.1.1 $
*/
final class DataFileAccessor {
private final DataFile dataFile;
private final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites;
private final RandomAccessFile file;
private boolean disposed;
/**
* Construct a Store reader
*
* @param file
* @throws IOException
*/
public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException{
this.dataFile = dataFile;
this.inflightWrites = dataManager.getInflightWrites();
this.file = dataFile.openRandomAccessFile(false);
}
public DataFile getDataFile() {
return dataFile;
}
public void dispose() {
if( disposed )
return;
disposed=true;
try {
dataFile.closeRandomAccessFile(file);
} catch (IOException e) {
e.printStackTrace();
}
}
public ByteSequence readRecord(Location location) throws IOException {
if( !location.isValid() || location.getSize()==Location.NOT_SET )
throw new IOException("Invalid location: "+location);
WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
if( asyncWrite!= null ) {
return asyncWrite.data;
}
try {
byte[] data=new byte[location.getSize()-AsyncDataManager.ITEM_HEAD_FOOT_SPACE];
file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE);
file.readFully(data);
return new ByteSequence(data, 0, data.length);
} catch (RuntimeException e) {
throw new IOException("Invalid location: "+location+", : "+e);
}
}
public void readLocationDetails(Location location) throws IOException {
WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
if( asyncWrite!= null ) {
location.setSize(asyncWrite.location.getSize());
location.setType(asyncWrite.location.getType());
} else {
file.seek(location.getOffset());
location.setSize(file.readInt());
location.setType(file.readByte());
}
}
public boolean readLocationDetailsAndValidate(Location location) {
try {
WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
if( asyncWrite!= null ) {
location.setSize(asyncWrite.location.getSize());
location.setType(asyncWrite.location.getType());
} else {
file.seek(location.getOffset());
location.setSize(file.readInt());
location.setType(file.readByte());
byte data[] = new byte[3];
file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR);
file.readFully(data);
if( data[0] != AsyncDataManager.ITEM_HEAD_SOR[0] ||
data[1] != AsyncDataManager.ITEM_HEAD_SOR[1] ||
data[2] != AsyncDataManager.ITEM_HEAD_SOR[2] ) {
return false;
}
file.seek(location.getOffset()+location.getSize()-AsyncDataManager.ITEM_FOOT_SPACE);
file.readFully(data);
if( data[0] != AsyncDataManager.ITEM_HEAD_EOR[0] ||
data[1] != AsyncDataManager.ITEM_HEAD_EOR[1] ||
data[2] != AsyncDataManager.ITEM_HEAD_EOR[2] ) {
return false;
}
}
} catch (IOException e) {
return false;
}
return true;
}
public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE);
int size = Math.min(data.getLength(), location.getSize());
file.write(data.getData(), data.getOffset(), size);
if( sync ) {
file.getFD().sync();
}
}
}

View File

@ -0,0 +1,138 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
/**
* Used to pool DataFileAccessors.
*
* @author chirino
*/
public class DataFileAccessorPool {
private final AsyncDataManager dataManager;
private final HashMap<Integer, Pool> pools = new HashMap<Integer, Pool>();
private boolean closed=false;
int MAX_OPEN_READERS_PER_FILE=5;
class Pool {
private final DataFile file;
private final ArrayList<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
private boolean used;
public Pool(DataFile file) {
this.file = file;
}
public DataFileAccessor openDataFileReader() throws IOException {
DataFileAccessor rc=null;
if( pool.isEmpty() ) {
rc = new DataFileAccessor(dataManager, file);
} else {
rc = (DataFileAccessor) pool.remove(pool.size()-1);
}
used=true;
return rc;
}
public void closeDataFileReader(DataFileAccessor reader) {
used=true;
if(pool.size() >= MAX_OPEN_READERS_PER_FILE ) {
reader.dispose();
} else {
pool.add(reader);
}
}
public void clearUsedMark() {
used=false;
}
public boolean isUsed() {
return used;
}
public void dispose() {
for (DataFileAccessor reader : pool) {
reader.dispose();
}
pool.clear();
}
}
public DataFileAccessorPool(AsyncDataManager dataManager){
this.dataManager=dataManager;
}
synchronized void clearUsedMark() {
for (Iterator iter = pools.values().iterator(); iter.hasNext();) {
Pool pool = (Pool) iter.next();
pool.clearUsedMark();
}
}
synchronized void disposeUnused() {
for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
Pool pool = iter.next();
if( !pool.isUsed() ) {
pool.dispose();
iter.remove();
}
}
}
synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
if( closed ) {
throw new IOException("Closed.");
}
Pool pool = pools.get(dataFile.getDataFileId());
if( pool == null ) {
pool = new Pool(dataFile);
pools.put(dataFile.getDataFileId(), pool);
}
return pool.openDataFileReader();
}
synchronized void closeDataFileAccessor(DataFileAccessor reader) {
Pool pool = pools.get(reader.getDataFile().getDataFileId());
if( pool == null || closed ) {
reader.dispose();
} else {
pool.closeDataFileReader(reader);
}
}
synchronized public void close() {
if(closed)
return;
closed=true;
for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
Pool pool = iter.next();
pool.dispose();
}
pools.clear();
}
}

View File

@ -0,0 +1,380 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.LinkedNode;
/**
* An optimized writer to do batch appends to a data file. This object is thread safe
* and gains throughput as you increase the number of concurrent writes it does.
*
* @version $Revision: 1.1.1.1 $
*/
class DataFileAppender {
protected static final byte []RESERVED_SPACE= new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE];
protected static final String SHUTDOWN_COMMAND = "SHUTDOWN";
int MAX_WRITE_BATCH_SIZE = 1024*1024*4;
static public class WriteKey {
private final int file;
private final long offset;
private final int hash;
public WriteKey(Location item){
file = item.getDataFileId();
offset = item.getOffset();
// TODO: see if we can build a better hash
hash = (int) (file ^ offset);
}
public int hashCode() {
return hash;
}
public boolean equals(Object obj) {
WriteKey di = (WriteKey)obj;
return di.file == file && di.offset == offset;
}
}
public class WriteBatch {
public final DataFile dataFile;
public final WriteCommand first;
public CountDownLatch latch;
public int size;
public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
this.dataFile=dataFile;
this.first=write;
size+=write.location.getSize();
if( write.sync ) {
latch = new CountDownLatch(1);
}
}
public boolean canAppend(DataFile dataFile, WriteCommand write) {
if( dataFile != this.dataFile )
return false;
if( size+write.location.getSize() >= MAX_WRITE_BATCH_SIZE )
return false;
return true;
}
public void append(WriteCommand write) throws IOException {
this.first.getTailNode().linkAfter(write);
size+=write.location.getSize();
if( write.sync && latch==null ) {
latch = new CountDownLatch(1);
}
}
}
public static class WriteCommand extends LinkedNode {
public final Location location;
public final ByteSequence data;
final boolean sync;
public WriteCommand(Location location, ByteSequence data, boolean sync) {
this.location = location;
this.data = data;
this.sync = sync;
}
}
protected final AsyncDataManager dataManager;
protected final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites;
protected final Object enqueueMutex = new Object();
protected WriteBatch nextWriteBatch;
private boolean running;
protected boolean shutdown;
protected IOException firstAsyncException;
protected final CountDownLatch shutdownDone = new CountDownLatch(1);
private Thread thread;
/**
* Construct a Store writer
*
* @param file
*/
public DataFileAppender(AsyncDataManager dataManager){
this.dataManager=dataManager;
this.inflightWrites = this.dataManager.getInflightWrites();
}
/**
* @param type
* @param marshaller
* @param payload
* @param type
* @param sync
* @return
* @throws IOException
* @throws
* @throws
*/
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
// Write the packet our internal buffer.
int size = data.getLength()+AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
final Location location=new Location();
location.setSize(size);
location.setType(type);
WriteBatch batch;
WriteCommand write = new WriteCommand(location, data, sync);
// Locate datafile and enqueue into the executor in sychronized block so that
// writes get equeued onto the executor in order that they were assigned by
// the data manager (which is basically just appending)
synchronized(this) {
// Find the position where this item will land at.
DataFile dataFile=dataManager.allocateLocation(location);
batch = enqueue(dataFile, write);
}
if( sync ) {
try {
batch.latch.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
} else {
inflightWrites.put(new WriteKey(location), write);
}
return location;
}
private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
synchronized(enqueueMutex) {
WriteBatch rc=null;
if( shutdown ) {
throw new IOException("Async Writter Thread Shutdown");
}
if( firstAsyncException !=null )
throw firstAsyncException;
if( !running ) {
running=true;
thread = new Thread() {
public void run() {
processQueue();
}
};
thread.setPriority(Thread.MAX_PRIORITY);
thread.setDaemon(true);
thread.setName("ActiveMQ Data File Writer");
thread.start();
}
if( nextWriteBatch == null ) {
nextWriteBatch = new WriteBatch(dataFile,write);
rc = nextWriteBatch;
enqueueMutex.notify();
} else {
// Append to current batch if possible..
if( nextWriteBatch.canAppend(dataFile, write) ) {
nextWriteBatch.append(write);
rc = nextWriteBatch;
} else {
// Otherwise wait for the queuedCommand to be null
try {
while( nextWriteBatch!=null ) {
enqueueMutex.wait();
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
if( shutdown ) {
throw new IOException("Async Writter Thread Shutdown");
}
// Start a new batch.
nextWriteBatch = new WriteBatch(dataFile,write);
rc = nextWriteBatch;
enqueueMutex.notify();
}
}
return rc;
}
}
public void close() throws IOException {
synchronized( enqueueMutex ) {
if( shutdown == false ) {
shutdown = true;
if( running ) {
enqueueMutex.notifyAll();
} else {
shutdownDone.countDown();
}
}
}
try {
shutdownDone.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
/**
* The async processing loop that writes to the data files and
* does the force calls.
*
* Since the file sync() call is the slowest of all the operations,
* this algorithm tries to 'batch' or group together several file sync() requests
* into a single file sync() call. The batching is accomplished attaching the
* same CountDownLatch instance to every force request in a group.
*
*/
protected void processQueue() {
DataFile dataFile=null;
RandomAccessFile file=null;
try {
DataByteArrayOutputStream buff = new DataByteArrayOutputStream(MAX_WRITE_BATCH_SIZE);
while( true ) {
Object o = null;
// Block till we get a command.
synchronized(enqueueMutex) {
while( true ) {
if( shutdown ) {
o = SHUTDOWN_COMMAND;
break;
}
if( nextWriteBatch!=null ) {
o = nextWriteBatch;
nextWriteBatch=null;
break;
}
enqueueMutex.wait();
}
enqueueMutex.notify();
}
if( o == SHUTDOWN_COMMAND ) {
break;
}
WriteBatch wb = (WriteBatch) o;
if( dataFile != wb.dataFile ) {
if( file!=null ) {
dataFile.closeRandomAccessFile(file);
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile(true);
}
WriteCommand write = wb.first;
// Write all the data.
// Only need to seek to first location.. all others
// are in sequence.
file.seek(write.location.getOffset());
//
// is it just 1 big write?
if( wb.size == write.location.getSize() ) {
// Just write it directly..
file.writeInt(write.location.getSize());
file.writeByte(write.location.getType());
file.write(RESERVED_SPACE);
file.write(AsyncDataManager.ITEM_HEAD_SOR);
file.write(write.data.getData(),write.data.getOffset(), write.data.getLength());
file.write(AsyncDataManager.ITEM_HEAD_EOR);
} else {
// Combine the smaller writes into 1 big buffer
while( write!=null ) {
buff.writeInt(write.location.getSize());
buff.writeByte(write.location.getType());
buff.write(RESERVED_SPACE);
buff.write(AsyncDataManager.ITEM_HEAD_SOR);
buff.write(write.data.getData(),write.data.getOffset(), write.data.getLength());
buff.write(AsyncDataManager.ITEM_HEAD_EOR);
write = (WriteCommand) write.getNext();
}
// Now do the 1 big write.
ByteSequence sequence = buff.toByteSequence();
file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
buff.reset();
}
file.getFD().sync();
WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode();
dataManager.setLastAppendLocation( lastWrite.location );
// Signal any waiting threads that the write is on disk.
if( wb.latch!=null ) {
wb.latch.countDown();
}
// Now that the data is on disk, remove the writes from the in flight
// cache.
write = wb.first;
while( write!=null ) {
if( !write.sync ) {
inflightWrites.remove(new WriteKey(write.location));
}
write = (WriteCommand) write.getNext();
}
}
} catch (IOException e) {
synchronized( enqueueMutex ) {
firstAsyncException = e;
}
} catch (InterruptedException e) {
} finally {
try {
if( file!=null ) {
dataFile.closeRandomAccessFile(file);
}
} catch (IOException e) {
}
shutdownDone.countDown();
}
}
}

View File

@ -0,0 +1,157 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.data.RedoListener;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
/**
* Provides a Kaha DataManager Facade to the DataManager.
*
* @version $Revision: 1.1.1.1 $
*/
public final class DataManagerFacade implements org.apache.activemq.kaha.impl.DataManager {
private static class StoreLocationFacade implements StoreLocation {
private final Location location;
public StoreLocationFacade(Location location) {
this.location = location;
}
public int getFile() {
return location.getDataFileId();
}
public long getOffset() {
return location.getOffset();
}
public int getSize() {
return location.getSize();
}
public Location getLocation() {
return location;
}
}
static private StoreLocation convertToStoreLocation(Location location) {
if(location==null)
return null;
return new StoreLocationFacade(location);
}
static private Location convertFromStoreLocation(StoreLocation location) {
if(location==null)
return null;
if( location.getClass()== StoreLocationFacade.class )
return ((StoreLocationFacade)location).getLocation();
Location l = new Location();
l.setOffset((int) location.getOffset());
l.setSize(location.getSize());
l.setDataFileId(location.getFile());
return l;
}
static final private ByteSequence FORCE_COMMAND = new ByteSequence(new byte[]{'F', 'O', 'R', 'C', 'E'});
AsyncDataManager dataManager;
private final String name;
private Marshaller redoMarshaller;
public DataManagerFacade(AsyncDataManager dataManager, String name) {
this.dataManager=dataManager;
this.name = name;
}
public Object readItem(Marshaller marshaller, StoreLocation location) throws IOException {
ByteSequence sequence = dataManager.read(convertFromStoreLocation(location));
DataByteArrayInputStream dataIn = new DataByteArrayInputStream(sequence);
return marshaller.readPayload(dataIn);
}
public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
marshaller.writePayload(payload,buffer);
ByteSequence data = buffer.toByteSequence();
return convertToStoreLocation(dataManager.write(data, (byte)1, false));
}
public void force() throws IOException {
dataManager.write(FORCE_COMMAND, (byte)2, true);
}
public void updateItem(StoreLocation location, Marshaller marshaller, Object payload) throws IOException {
final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
marshaller.writePayload(payload,buffer);
ByteSequence data = buffer.toByteSequence();
dataManager.update(convertFromStoreLocation(location), data, false);
}
public void close() throws IOException {
dataManager.close();
}
public void consolidateDataFiles() throws IOException {
dataManager.consolidateDataFiles();
}
public boolean delete() throws IOException {
return dataManager.delete();
}
public void addInterestInFile(int file) throws IOException {
dataManager.addInterestInFile(file);
}
public void removeInterestInFile(int file) throws IOException {
dataManager.removeInterestInFile(file);
}
public void recoverRedoItems(RedoListener listener) throws IOException {
throw new RuntimeException("Not Implemented..");
}
public StoreLocation storeRedoItem(Object payload) throws IOException {
throw new RuntimeException("Not Implemented..");
}
public Marshaller getRedoMarshaller() {
return redoMarshaller;
}
public void setRedoMarshaller(Marshaller redoMarshaller) {
this.redoMarshaller = redoMarshaller;
}
public String getName() {
return name;
}
}

View File

@ -0,0 +1,108 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.IOException;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.util.ByteSequence;
/**
* Provides a Journal Facade to the DataManager.
*
* @version $Revision: 1.1.1.1 $
*/
public final class JournalFacade implements Journal {
public static class RecordLocationFacade implements RecordLocation {
private final Location location;
public RecordLocationFacade(Location location) {
this.location = location;
}
public Location getLocation() {
return location;
}
public int compareTo(Object o) {
RecordLocationFacade rlf = (RecordLocationFacade)o;
int rc = location.compareTo(rlf.location);
return rc;
}
}
static private RecordLocation convertToRecordLocation(Location location) {
if(location==null)
return null;
return new RecordLocationFacade(location);
}
static private Location convertFromRecordLocation(RecordLocation location) {
if(location==null)
return null;
return ((RecordLocationFacade)location).getLocation();
}
AsyncDataManager dataManager;
public JournalFacade(AsyncDataManager dataManager) {
this.dataManager = dataManager;
}
public void close() throws IOException {
dataManager.close();
}
public RecordLocation getMark() throws IllegalStateException {
return convertToRecordLocation(dataManager.getMark());
}
public RecordLocation getNextRecordLocation(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException {
return convertToRecordLocation(dataManager.getNextLocation(convertFromRecordLocation(location)));
}
public Packet read(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException {
ByteSequence rc = dataManager.read(convertFromRecordLocation(location));
if( rc == null )
return null;
return new ByteArrayPacket(rc.getData(), rc.getOffset(), rc.getLength());
}
public void setJournalEventListener(JournalEventListener listener) throws IllegalStateException {
}
public void setMark(RecordLocation location, boolean sync) throws InvalidRecordLocationException, IOException, IllegalStateException {
dataManager.setMark(convertFromRecordLocation(location), sync);
}
public RecordLocation write(Packet packet, boolean sync) throws IOException, IllegalStateException {
org.apache.activeio.packet.ByteSequence data = packet.asByteSequence();
ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
return convertToRecordLocation(dataManager.write(sequence, sync));
}
}

View File

@ -0,0 +1,126 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Used as a location in the data store.
*
* @version $Revision: 1.2 $
*/
public final class Location {
public static final byte MARK_TYPE=-1;
public static final byte USER_TYPE=1;
public static final byte NOT_SET_TYPE=0;
public static final int NOT_SET=-1;
private int dataFileId=NOT_SET;
private int offset=NOT_SET;
private int size=NOT_SET;
private byte type=NOT_SET_TYPE;
public Location(){}
Location(Location item) {
this.dataFileId = item.dataFileId;
this.offset = item.offset;
this.size = item.size;
this.type = item.type;
}
boolean isValid(){
return dataFileId != NOT_SET;
}
/**
* @return the size of the data record including the header.
*/
public int getSize(){
return size;
}
/**
* @param size the size of the data record including the header.
*/
public void setSize(int size){
this.size=size;
}
/**
* @return the size of the payload of the record.
*/
public int getPaylodSize() {
return size-AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
}
public int getOffset(){
return offset;
}
public void setOffset(int offset){
this.offset=offset;
}
public int getDataFileId(){
return dataFileId;
}
public void setDataFileId(int file){
this.dataFileId=file;
}
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
public String toString(){
String result="offset = "+offset+", file = " + dataFileId + ", size = "+size + ", type = "+type;
return result;
}
public int compareTo(Object o) {
Location l = (Location)o;
if( dataFileId == l.dataFileId ) {
int rc = offset-l.offset;
return rc;
}
return dataFileId - l.dataFileId;
}
public void writeExternal(DataOutput dos) throws IOException {
dos.writeInt(dataFileId);
dos.writeInt(offset);
dos.writeInt(size);
dos.writeByte(type);
}
public void readExternal(DataInput dis) throws IOException {
dataFileId = dis.readInt();
offset = dis.readInt();
size = dis.readInt();
type = dis.readByte();
}
}

View File

@ -0,0 +1,213 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more efficently
* copy data to files.
*
* @version $Revision: 1.1.1.1 $
*/
class NIODataFileAppender extends DataFileAppender {
public NIODataFileAppender(AsyncDataManager fileManager) {
super(fileManager);
}
/**
* The async processing loop that writes to the data files and
* does the force calls.
*
* Since the file sync() call is the slowest of all the operations,
* this algorithm tries to 'batch' or group together several file sync() requests
* into a single file sync() call. The batching is accomplished attaching the
* same CountDownLatch instance to every force request in a group.
*
*/
protected void processQueue() {
DataFile dataFile=null;
RandomAccessFile file=null;
FileChannel channel=null;
try {
ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE);
ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE);
ByteBuffer buffer = ByteBuffer.allocateDirect(MAX_WRITE_BATCH_SIZE);
// Populate the static parts of the headers and footers..
header.putInt(0); // size
header.put((byte) 0); // type
header.put(RESERVED_SPACE); // reserved
header.put(AsyncDataManager.ITEM_HEAD_SOR);
footer.put(AsyncDataManager.ITEM_HEAD_EOR);
while( true ) {
Object o = null;
// Block till we get a command.
synchronized(enqueueMutex) {
while( true ) {
if( shutdown ) {
o = SHUTDOWN_COMMAND;
break;
}
if( nextWriteBatch!=null ) {
o = nextWriteBatch;
nextWriteBatch=null;
break;
}
enqueueMutex.wait();
}
enqueueMutex.notify();
}
if( o == SHUTDOWN_COMMAND ) {
break;
}
WriteBatch wb = (WriteBatch) o;
if( dataFile != wb.dataFile ) {
if( file!=null ) {
dataFile.closeRandomAccessFile(file);
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile(true);
channel = file.getChannel();
}
WriteCommand write = wb.first;
// Write all the data.
// Only need to seek to first location.. all others
// are in sequence.
file.seek(write.location.getOffset());
//
// is it just 1 big write?
if( wb.size == write.location.getSize() ) {
header.clear();
header.putInt(write.location.getSize());
header.put(write.location.getType());
header.clear();
transfer(header, channel);
ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength());
transfer(source, channel);
footer.clear();
transfer(footer, channel);
} else {
// Combine the smaller writes into 1 big buffer
while( write!=null ) {
header.clear();
header.putInt(write.location.getSize());
header.put(write.location.getType());
header.clear();
copy(header, buffer);
assert !header.hasRemaining();
ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength());
copy(source, buffer);
assert !source.hasRemaining();
footer.clear();
copy(footer, buffer);
assert !footer.hasRemaining();
write = (WriteCommand) write.getNext();
}
// Fully write out the buffer..
buffer.flip();
transfer(buffer, channel);
buffer.clear();
}
file.getChannel().force(false);
WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode();
dataManager.setLastAppendLocation( lastWrite.location );
// Signal any waiting threads that the write is on disk.
if( wb.latch!=null ) {
wb.latch.countDown();
}
// Now that the data is on disk, remove the writes from the in flight
// cache.
write = wb.first;
while( write!=null ) {
if( !write.sync ) {
inflightWrites.remove(new WriteKey(write.location));
}
write = (WriteCommand) write.getNext();
}
}
} catch (IOException e) {
synchronized( enqueueMutex ) {
firstAsyncException = e;
}
} catch (InterruptedException e) {
} finally {
try {
if( file!=null ) {
dataFile.closeRandomAccessFile(file);
}
} catch (IOException e) {
}
shutdownDone.countDown();
}
}
/**
* Copy the bytes in header to the channel.
* @param header - source of data
* @param channel - destination where the data will be written.
* @throws IOException
*/
private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
while (header.hasRemaining()) {
channel.write(header);
}
}
private int copy(ByteBuffer src, ByteBuffer dest) {
int rc = Math.min(dest.remaining(), src.remaining());
if( rc > 0 ) {
// Adjust our limit so that we don't overflow the dest buffer.
int limit = src.limit();
src.limit(src.position()+rc);
dest.put(src);
// restore the limit.
src.limit(limit);
}
return rc;
}
}

View File

@ -24,7 +24,7 @@ import java.util.List;
import org.apache.activemq.kaha.IndexTypes;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.DiskIndexLinkedList;
import org.apache.activemq.kaha.impl.index.IndexItem;

View File

@ -27,7 +27,7 @@ import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;

View File

@ -32,7 +32,7 @@ import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexLinkedList;

View File

@ -1,95 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.data;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Map;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.data.AsyncDataFileWriter.WriteCommand;
import org.apache.activemq.kaha.impl.data.AsyncDataFileWriter.WriteKey;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayInputStream;
/**
* Optimized Store reader
*
* @version $Revision: 1.1.1.1 $
*/
final class AsyncDataFileReader implements DataFileReader {
// static final Log log = LogFactory.getLog(AsyncDataFileReader.class);
private DataManager dataManager;
private DataByteArrayInputStream dataIn;
private final Map inflightWrites;
/**
* Construct a Store reader
*
* @param file
*/
AsyncDataFileReader(DataManager fileManager, AsyncDataFileWriter writer){
this.dataManager=fileManager;
this.inflightWrites = writer.getInflightWrites();
this.dataIn=new DataByteArrayInputStream();
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
*/
public synchronized byte readDataItemSize(DataItem item) throws IOException {
WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(item));
if( asyncWrite!= null ) {
item.setSize(asyncWrite.location.getSize());
return asyncWrite.data[0];
}
RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
byte rc;
synchronized(file) {
file.seek(item.getOffset()); // jump to the size field
rc = file.readByte();
item.setSize(file.readInt());
}
return rc;
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller, org.apache.activemq.kaha.StoreLocation)
*/
public synchronized Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(item));
if( asyncWrite!= null ) {
ByteArrayInputStream stream = new ByteArrayInputStream(asyncWrite.data, DataManager.ITEM_HEAD_SIZE, item.getSize());
return marshaller.readPayload(new DataInputStream(stream));
}
RandomAccessFile file=dataManager.getDataFile(item).getRandomAccessFile();
// TODO: we could reuse the buffer in dataIn if it's big enough to avoid
// allocating byte[] arrays on every readItem.
byte[] data=new byte[item.getSize()];
synchronized(file) {
file.seek(item.getOffset()+DataManager.ITEM_HEAD_SIZE);
file.readFully(data);
}
dataIn.restart(data);
return marshaller.readPayload(dataIn);
}
}

View File

@ -1,376 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.data;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.util.DataByteArrayOutputStream;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
/**
* Optimized Store writer that uses an async thread do batched writes to
* the datafile.
*
* @version $Revision: 1.1.1.1 $
*/
final class AsyncDataFileWriter implements DataFileWriter {
// static final Log log = LogFactory.getLog(AsyncDataFileWriter.class);
private static final String SHUTDOWN_COMMAND = "SHUTDOWN";
static public class WriteKey {
private final int file;
private final long offset;
private final int hash;
public WriteKey(StoreLocation item){
file = item.getFile();
offset = item.getOffset();
// TODO: see if we can build a better hash
hash = (int) (file ^ offset);
}
public int hashCode() {
return hash;
}
public boolean equals(Object obj) {
WriteKey di = (WriteKey)obj;
return di.file == file && di.offset == offset;
}
}
public static class WriteCommand {
public final StoreLocation location;
public final RandomAccessFile dataFile;
public final byte[] data;
public final CountDownLatch latch;
public WriteCommand(StoreLocation location, RandomAccessFile dataFile, byte[] data, CountDownLatch latch) {
this.location = location;
this.dataFile = dataFile;
this.data = data;
this.latch = latch;
}
public String toString() {
return "write: "+location+", latch = "+System.identityHashCode(latch);
}
}
private DataManager dataManager;
private final Object enqueueMutex = new Object();
private final LinkedList queue = new LinkedList();
// Maps WriteKey -> WriteCommand for all the writes that still have not landed on
// disk.
private final ConcurrentHashMap inflightWrites = new ConcurrentHashMap();
private final UsageManager usage = new UsageManager();
private CountDownLatch latchAssignedToNewWrites = new CountDownLatch(1);
private boolean running;
private boolean shutdown;
private IOException firstAsyncException;
private final CountDownLatch shutdownDone = new CountDownLatch(1);
/**
* Construct a Store writer
*
* @param file
*/
AsyncDataFileWriter(DataManager fileManager){
this.dataManager=fileManager;
this.usage.setLimit(1024*1024*8); // Allow about 8 megs of concurrent data to be queued up
}
public void force(final DataFile dataFile) throws IOException {
try {
CountDownLatch latch = null;
synchronized( enqueueMutex ) {
latch = (CountDownLatch) dataFile.getWriterData();
}
if( latch==null ) {
return;
}
latch.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
/**
* @param marshaller
* @param payload
* @param type
* @return
* @throws IOException
*/
public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
// We may need to slow down if we are pounding the async thread too
// hard..
try {
usage.waitForSpace();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
// Write the packet our internal buffer.
final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
buffer.position(DataManager.ITEM_HEAD_SIZE);
marshaller.writePayload(payload,buffer);
final int size=buffer.size();
int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
final DataItem item=new DataItem();
item.setSize(payloadSize);
usage.increaseUsage(size);
// Locate datafile and enqueue into the executor in sychronized block so that
// writes get equeued onto the executor in order that they were assigned by
// the data manager (which is basically just appending)
WriteCommand write;
synchronized(enqueueMutex) {
// Find the position where this item will land at.
final DataFile dataFile=dataManager.findSpaceForData(item);
dataManager.addInterestInFile(dataFile);
dataFile.setWriterData(latchAssignedToNewWrites);
write = new WriteCommand(item, dataFile.getRandomAccessFile(), buffer.getData(), latchAssignedToNewWrites);
enqueue(write);
}
inflightWrites.put(new WriteKey(item), write);
return item;
}
/**
*
*/
public void updateItem(final DataItem item,Marshaller marshaller,Object payload,byte type) throws IOException{
// We may need to slow down if we are pounding the async thread too
// hard..
try{
usage.waitForSpace();
}catch(InterruptedException e){
throw new InterruptedIOException();
}
synchronized(enqueueMutex){
// Write the packet our internal buffer.
final DataByteArrayOutputStream buffer=new DataByteArrayOutputStream();
buffer.position(DataManager.ITEM_HEAD_SIZE);
marshaller.writePayload(payload,buffer);
final int size=buffer.size();
int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
item.setSize(payloadSize);
final DataFile dataFile=dataManager.getDataFile(item);
usage.increaseUsage(size);
WriteCommand write=new WriteCommand(item,dataFile.getRandomAccessFile(),buffer.getData(),
latchAssignedToNewWrites);
// Equeue the write to an async thread.
synchronized(enqueueMutex){
dataFile.setWriterData(latchAssignedToNewWrites);
enqueue(write);
}
inflightWrites.put(new WriteKey(item),write);
}
}
private void enqueue(Object command) throws IOException {
if( shutdown ) {
throw new IOException("Async Writter Thread Shutdown");
}
if( firstAsyncException !=null )
throw firstAsyncException;
if( !running ) {
running=true;
Thread thread = new Thread() {
public void run() {
processQueue();
}
};
thread.setPriority(Thread.MAX_PRIORITY);
thread.setDaemon(true);
thread.setName("ActiveMQ Data File Writer");
thread.start();
}
queue.addLast(command);
enqueueMutex.notify();
}
private Object dequeue() {
synchronized( enqueueMutex ) {
while( queue.isEmpty() ) {
inflightWrites.clear();
try {
enqueueMutex.wait();
} catch (InterruptedException e) {
return SHUTDOWN_COMMAND;
}
}
return queue.removeFirst();
}
}
public void close() throws IOException {
synchronized( enqueueMutex ) {
if( shutdown == false ) {
shutdown = true;
if( running ) {
queue.add(SHUTDOWN_COMMAND);
enqueueMutex.notify();
} else {
shutdownDone.countDown();
}
}
}
try {
shutdownDone.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
boolean isShutdown() {
synchronized( enqueueMutex ) {
return shutdown;
}
}
/**
* The async processing loop that writes to the data files and
* does the force calls.
*
* Since the file sync() call is the slowest of all the operations,
* this algorithm tries to 'batch' or group together several file sync() requests
* into a single file sync() call. The batching is accomplished attaching the
* same CountDownLatch instance to every force request in a group.
*
*/
private void processQueue() {
// log.debug("Async thread startup");
try {
CountDownLatch currentBatchLatch=null;
RandomAccessFile currentBatchDataFile=null;
while( true ) {
// Block till we get a command.
Object o = dequeue();
// log.debug("Processing: "+o);
if( o == SHUTDOWN_COMMAND ) {
if( currentBatchLatch!=null ) {
currentBatchDataFile.getFD().sync();
currentBatchLatch.countDown();
}
break;
} else if( o.getClass() == CountDownLatch.class ) {
// The CountDownLatch is used as the end of batch indicator.
// Must match..
if( o == currentBatchLatch ) {
currentBatchDataFile.getFD().sync();
currentBatchLatch.countDown();
currentBatchLatch=null;
currentBatchDataFile=null;
} else {
new IOException("Got an out of sequence end of end of batch indicator.");
}
} else if( o.getClass() == WriteCommand.class ) {
WriteCommand write = (WriteCommand) o;
if( currentBatchDataFile == null )
currentBatchDataFile = write.dataFile;
// We may need to prematurely sync if the batch
// if user is switching between data files.
if( currentBatchDataFile!=write.dataFile ) {
currentBatchDataFile.getFD().sync();
currentBatchDataFile = write.dataFile;
}
// Write to the data..
int size = write.location.getSize()+DataManager.ITEM_HEAD_SIZE;
synchronized(write.dataFile) {
write.dataFile.seek(write.location.getOffset());
write.dataFile.write(write.data,0,size);
}
inflightWrites.remove(new WriteKey(write.location));
usage.decreaseUsage(size);
// Start of a batch..
if( currentBatchLatch == null ) {
currentBatchLatch = write.latch;
synchronized(enqueueMutex) {
// get the request threads to start using a new latch..
// write commands allready in the queue should have the
// same latch assigned.
latchAssignedToNewWrites = new CountDownLatch(1);
if( !shutdown ) {
enqueue(currentBatchLatch);
}
}
} else if( currentBatchLatch!=write.latch ) {
// the latch on subsequent writes should match.
new IOException("Got an out of sequence write");
}
}
}
} catch (IOException e) {
synchronized( enqueueMutex ) {
firstAsyncException = e;
}
// log.debug("Aync thread shutdown due to error: "+e,e);
} finally {
// log.debug("Aync thread shutdown");
shutdownDone.countDown();
}
}
public synchronized ConcurrentHashMap getInflightWrites() {
return inflightWrites;
}
}

View File

@ -1,24 +0,0 @@
package org.apache.activemq.kaha.impl.data;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
interface DataFileReader {
/**
* Sets the size property on a DataItem and returns the type of item that this was
* created as.
*
* @param marshaller
* @param item
* @return
* @throws IOException
*/
byte readDataItemSize(DataItem item) throws IOException;
Object readItem(Marshaller marshaller, StoreLocation item)
throws IOException;
}

View File

@ -1,27 +0,0 @@
package org.apache.activemq.kaha.impl.data;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
interface DataFileWriter {
/**
* @param marshaller
* @param payload
* @param data_item2
* @return
* @throws IOException
* @throws FileNotFoundException
*/
public DataItem storeItem(Marshaller marshaller, Object payload,
byte type) throws IOException;
public void updateItem(DataItem item, Marshaller marshaller,
Object payload, byte type) throws IOException;
public void force(DataFile dataFile) throws IOException;
public void close() throws IOException;
}

View File

@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.commons.logging.Log;
@ -37,21 +38,19 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision: 1.1.1.1 $
*/
public final class DataManager{
public final class DataManagerImpl implements DataManager {
private static final Log log=LogFactory.getLog(DataManager.class);
private static final Log log=LogFactory.getLog(DataManagerImpl.class);
public static long MAX_FILE_LENGTH=1024*1024*32;
private static final String NAME_PREFIX="data-";
private final File dir;
private final String name;
private DataFileReader reader;
private DataFileWriter writer;
private SyncDataFileReader reader;
private SyncDataFileWriter writer;
private DataFile currentWriteFile;
private long maxFileLength = MAX_FILE_LENGTH;
Map fileMap=new HashMap();
private boolean useAsyncWriter=false;
public static final int ITEM_HEAD_SIZE=5; // type + length
public static final byte DATA_ITEM_TYPE=1;
public static final byte REDO_ITEM_TYPE=2;
@ -59,7 +58,7 @@ public final class DataManager{
Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
private String dataFilePrefix;
public DataManager(File dir, final String name){
public DataManagerImpl(File dir, final String name){
this.dir=dir;
this.name=name;
@ -93,6 +92,9 @@ public final class DataManager{
return result;
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#getName()
*/
public String getName(){
return name;
}
@ -121,22 +123,37 @@ public final class DataManager{
return dataFile;
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller, org.apache.activemq.kaha.StoreLocation)
*/
public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException{
return getReader().readItem(marshaller,item);
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller, java.lang.Object)
*/
public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException{
return getWriter().storeItem(marshaller,payload, DATA_ITEM_TYPE);
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object)
*/
public synchronized StoreLocation storeRedoItem(Object payload) throws IOException{
return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation, org.apache.activemq.kaha.Marshaller, java.lang.Object)
*/
public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException {
getWriter().updateItem((DataItem)location,marshaller,payload,DATA_ITEM_TYPE);
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener)
*/
public synchronized void recoverRedoItems(RedoListener listener) throws IOException{
// Nothing to recover if there is no current file.
@ -179,6 +196,9 @@ public final class DataManager{
}
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#close()
*/
public synchronized void close() throws IOException{
getWriter().close();
for(Iterator i=fileMap.values().iterator();i.hasNext();){
@ -189,6 +209,9 @@ public final class DataManager{
fileMap.clear();
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#force()
*/
public synchronized void force() throws IOException{
for(Iterator i=fileMap.values().iterator();i.hasNext();){
DataFile dataFile=(DataFile) i.next();
@ -197,6 +220,9 @@ public final class DataManager{
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#delete()
*/
public synchronized boolean delete() throws IOException{
boolean result=true;
for(Iterator i=fileMap.values().iterator();i.hasNext();){
@ -208,6 +234,9 @@ public final class DataManager{
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int)
*/
public synchronized void addInterestInFile(int file) throws IOException{
if(file>=0){
Integer key=new Integer(file);
@ -225,6 +254,9 @@ public final class DataManager{
}
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int)
*/
public synchronized void removeInterestInFile(int file) throws IOException{
if(file>=0){
Integer key=new Integer(file);
@ -243,6 +275,9 @@ public final class DataManager{
}
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles()
*/
public synchronized void consolidateDataFiles() throws IOException{
List purgeList=new ArrayList();
for(Iterator i=fileMap.values().iterator();i.hasNext();){
@ -264,10 +299,16 @@ public final class DataManager{
log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller()
*/
public Marshaller getRedoMarshaller() {
return redoMarshaller;
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller)
*/
public void setRedoMarshaller(Marshaller redoMarshaller) {
this.redoMarshaller = redoMarshaller;
}
@ -290,45 +331,30 @@ public final class DataManager{
return "DataManager:("+NAME_PREFIX+name+")";
}
public synchronized DataFileReader getReader() {
public synchronized SyncDataFileReader getReader() {
if( reader == null ) {
reader = createReader();
}
return reader;
}
protected synchronized DataFileReader createReader() {
if( useAsyncWriter ) {
return new AsyncDataFileReader(this, (AsyncDataFileWriter) getWriter());
} else {
return new SyncDataFileReader(this);
}
protected synchronized SyncDataFileReader createReader() {
return new SyncDataFileReader(this);
}
public synchronized void setReader(DataFileReader reader) {
public synchronized void setReader(SyncDataFileReader reader) {
this.reader = reader;
}
public synchronized DataFileWriter getWriter() {
public synchronized SyncDataFileWriter getWriter() {
if( writer==null ) {
writer = createWriter();
}
return writer;
}
private DataFileWriter createWriter() {
if( useAsyncWriter ) {
return new AsyncDataFileWriter(this);
} else {
return new SyncDataFileWriter(this);
}
private SyncDataFileWriter createWriter() {
return new SyncDataFileWriter(this);
}
public synchronized void setWriter(DataFileWriter writer) {
public synchronized void setWriter(SyncDataFileWriter writer) {
this.writer = writer;
}
public synchronized boolean isUseAsyncWriter() {
return useAsyncWriter;
}
public synchronized void setUseAsyncWriter(boolean useAsyncWriter) {
this.useAsyncWriter = useAsyncWriter;
}
}

View File

@ -27,9 +27,9 @@ import org.apache.activemq.util.DataByteArrayInputStream;
*
* @version $Revision: 1.1.1.1 $
*/
final class SyncDataFileReader implements DataFileReader {
public final class SyncDataFileReader {
private DataManager dataManager;
private DataManagerImpl dataManager;
private DataByteArrayInputStream dataIn;
/**
@ -37,7 +37,7 @@ final class SyncDataFileReader implements DataFileReader {
*
* @param file
*/
SyncDataFileReader(DataManager fileManager){
SyncDataFileReader(DataManagerImpl fileManager){
this.dataManager=fileManager;
this.dataIn=new DataByteArrayInputStream();
}
@ -62,7 +62,7 @@ final class SyncDataFileReader implements DataFileReader {
// TODO: we could reuse the buffer in dataIn if it's big enough to avoid
// allocating byte[] arrays on every readItem.
byte[] data=new byte[item.getSize()];
file.seek(item.getOffset()+DataManager.ITEM_HEAD_SIZE);
file.seek(item.getOffset()+DataManagerImpl.ITEM_HEAD_SIZE);
file.readFully(data);
dataIn.restart(data);
return marshaller.readPayload(dataIn);

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.util.DataByteArrayOutputStream;
/**
* Optimized Store writer. Synchronously marshalls and writes to the data file. Simple but
@ -29,10 +28,10 @@ import org.apache.activemq.util.DataByteArrayOutputStream;
*
* @version $Revision: 1.1.1.1 $
*/
final class SyncDataFileWriter implements DataFileWriter{
final public class SyncDataFileWriter {
private DataByteArrayOutputStream buffer;
private DataManager dataManager;
private DataManagerImpl dataManager;
/**
@ -40,7 +39,7 @@ final class SyncDataFileWriter implements DataFileWriter{
*
* @param file
*/
SyncDataFileWriter(DataManager fileManager){
SyncDataFileWriter(DataManagerImpl fileManager){
this.dataManager=fileManager;
this.buffer=new DataByteArrayOutputStream();
}
@ -52,10 +51,10 @@ final class SyncDataFileWriter implements DataFileWriter{
// Write the packet our internal buffer.
buffer.reset();
buffer.position(DataManager.ITEM_HEAD_SIZE);
buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
marshaller.writePayload(payload,buffer);
int size=buffer.size();
int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
int payloadSize=size-DataManagerImpl.ITEM_HEAD_SIZE;
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
@ -80,10 +79,10 @@ final class SyncDataFileWriter implements DataFileWriter{
public synchronized void updateItem(DataItem item,Marshaller marshaller, Object payload, byte type) throws IOException {
//Write the packet our internal buffer.
buffer.reset();
buffer.position(DataManager.ITEM_HEAD_SIZE);
buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
marshaller.writePayload(payload,buffer);
int size=buffer.size();
int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
int payloadSize=size-DataManagerImpl.ITEM_HEAD_SIZE;
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);

View File

@ -26,7 +26,7 @@ import java.util.LinkedList;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.DataManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**

View File

@ -20,7 +20,7 @@ package org.apache.activemq.kaha.impl.index;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.util.DataByteArrayOutputStream;
/**
* Optimized Store writer

View File

@ -0,0 +1,157 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
/**
* Provides a base class for you to extend when you want object to maintain
* a doubly linked list to other objects without using a collection class.
*
* @author chirino
*/
public class LinkedNode {
protected LinkedNode next=this;
protected LinkedNode prev=this;
protected boolean tail=true;
public LinkedNode getHeadNode() {
if( isHeadNode() ) {
return this;
}
if( isTailNode() ) {
return next;
}
LinkedNode rc = prev;
while(!rc.isHeadNode()) {
rc = rc.prev;
}
return rc;
}
public LinkedNode getTailNode() {
if( isTailNode() ) {
return this;
}
if( isHeadNode() ) {
return prev;
}
LinkedNode rc = next;
while(!rc.isTailNode()) {
rc = rc.next;
}
return rc;
}
public LinkedNode getNext() {
return tail ? null : next;
}
public LinkedNode getPrevious() {
return prev.tail ? null : prev;
}
public boolean isHeadNode() {
return prev.isTailNode();
}
public boolean isTailNode() {
return tail;
}
/**
* @param rightHead the node to link after this node.
* @return this
*/
public LinkedNode linkAfter(LinkedNode rightHead) {
if( rightHead == this ) {
throw new IllegalArgumentException("You cannot link to yourself");
}
if( !rightHead.isHeadNode() ) {
throw new IllegalArgumentException("You only insert nodes that are the first in a list");
}
LinkedNode rightTail = rightHead.prev;
if( tail ) {
tail = false;
} else {
rightTail.tail=false;
}
rightHead.prev = this; // link the head of the right side.
rightTail.next = next; // link the tail of the right side
next.prev = rightTail; // link the head of the left side
next = rightHead; // link the tail of the left side.
return this;
}
/**
* @param leftHead the node to link after this node.
* @return
* @return this
*/
public LinkedNode linkBefore(LinkedNode leftHead) {
if( leftHead == this ) {
throw new IllegalArgumentException("You cannot link to yourself");
}
if( !leftHead.isHeadNode() ) {
throw new IllegalArgumentException("You only insert nodes that are the first in a list");
}
// The left side is no longer going to be a tail..
LinkedNode leftTail = leftHead.prev;
leftTail.tail = false;
leftTail.next = this; // link the tail of the left side.
leftHead.prev = prev; // link the head of the left side.
prev.next = leftHead; // link the tail of the right side.
prev = leftTail; // link the head of the right side.
return leftHead;
}
/**
* Removes this node out of the linked list it is chained in.
*/
public void unlink() {
// If we are allready unlinked...
if( prev==this ) {
return;
}
if( tail ) {
prev.tail = true;
}
// Update the peers links..
next.prev = prev;
prev.next = next;
// Update our links..
next = this;
prev = this;
tail=true;
}
}

View File

@ -18,13 +18,13 @@ package org.apache.activemq.kaha.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.activemq.kaha.IndexTypes;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.impl.KahaStore;
import org.apache.activemq.kaha.impl.container.ContainerId;
import org.apache.activemq.kaha.impl.container.ListContainerImpl;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;
/**

View File

@ -0,0 +1,180 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.File;
import java.io.IOException;
import junit.framework.TestCase;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.kaha.impl.async.JournalFacade.RecordLocationFacade;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Tests the AsyncDataManager based Journal
*
* @version $Revision: 1.1 $
*/
public class JournalImplTest extends TestCase {
Log log = LogFactory.getLog(JournalImplTest.class);
int size = 1024*10;
int logFileCount=2;
File logDirectory = new File("target/dm-data2");
private JournalFacade journal;
/**
* @see junit.framework.TestCase#setUp()
*/
protected void setUp() throws Exception {
if( logDirectory.exists() ) {
deleteDir(logDirectory);
}
assertTrue("Could not delete directory: "+logDirectory.getCanonicalPath(), !logDirectory.exists() );
AsyncDataManager dm = new AsyncDataManager();
dm.setDirectory(logDirectory);
dm.setMaxFileLength(1024*64);
dm.start();
journal = new JournalFacade(dm);
}
/**
*/
private void deleteDir(File f) {
File[] files = f.listFiles();
for (int i = 0; i < files.length; i++) {
File file = files[i];
file.delete();
}
f.delete();
}
protected void tearDown() throws Exception {
journal.close();
if( logDirectory.exists() )
deleteDir(logDirectory);
//assertTrue( !logDirectory.exists() );
}
public void testLogFileCreation() throws IOException {
RecordLocation mark = journal.getMark();
assertNull(mark);
}
@SuppressWarnings("unchecked")
public void testAppendAndRead() throws InvalidRecordLocationException, InterruptedException, IOException {
Packet data1 = createPacket("Hello World 1");
RecordLocation location1 = journal.write( data1, false);
Packet data2 = createPacket("Hello World 2");
RecordLocation location2 = journal.write( data2, false);
Packet data3 = createPacket("Hello World 3");
RecordLocation location3 = journal.write( data3, false);
//Thread.sleep(1000);
// Now see if we can read that data.
Packet data;
data = journal.read(location2);
assertEquals( data2, data);
data = journal.read(location1);
assertEquals( data1, data);
data = journal.read(location3);
assertEquals( data3, data);
// Can we cursor the data?
RecordLocation l=journal.getNextRecordLocation(null);
int t = l.compareTo(location1);
assertEquals(0, t);
data = journal.read(l);
assertEquals( data1, data);
l=journal.getNextRecordLocation(l);
assertEquals(0, l.compareTo(location2));
data = journal.read(l);
assertEquals( data2, data);
l=journal.getNextRecordLocation(l);
assertEquals(0, l.compareTo(location3));
data = journal.read(l);
assertEquals( data3, data);
l=journal.getNextRecordLocation(l);
assertNull(l);
log.info(journal);
}
public void testCanReadFromArchivedLogFile() throws InvalidRecordLocationException, InterruptedException, IOException {
Packet data1 = createPacket("Hello World 1");
RecordLocationFacade location1 = (RecordLocationFacade) journal.write( data1, false);
RecordLocationFacade pos;
int counter = 0;
do {
Packet p = createPacket("<<<data>>>");
pos = (RecordLocationFacade) journal.write( p, false);
if( counter++ % 1000 == 0 ) {
journal.setMark(pos, false);
}
} while( pos.getLocation().getDataFileId() < 5 );
// Now see if we can read that first packet.
Packet data;
data = journal.read(location1);
assertEquals( data1, data);
}
/**
* @param string
* @return
*/
private Packet createPacket(String string) {
return new ByteArrayPacket(string.getBytes());
}
public static void assertEquals(Packet arg0, Packet arg1) {
assertEquals(arg0.sliceAsBytes(), arg1.sliceAsBytes());
}
public static void assertEquals(byte[] arg0, byte[] arg1) {
// System.out.println("Comparing: "+new String(arg0)+" and "+new String(arg1));
if( arg0==null ^ arg1==null )
fail("Not equal: "+arg0+" != "+arg1);
if( arg0==null )
return;
if( arg0.length!=arg1.length)
fail("Array lenght not equal: "+arg0.length+" != "+arg1.length);
for( int i=0; i<arg0.length;i++) {
if( arg0[i]!= arg1[i]) {
fail("Array item not equal at index "+i+": "+arg0[i]+" != "+arg1[i]);
}
}
}
}

View File

@ -0,0 +1,82 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.File;
import java.io.IOException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalPerfToolSupport;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
/**
* A Performance statistics gathering tool for the AsyncDataManager based Journal.
*
* @version $Revision: 1.1 $
*/
public class JournalPerfTool extends JournalPerfToolSupport {
private int logFileSize = 1024*1024*50;
public static void main(String[] args) throws Exception {
JournalPerfTool tool = new JournalPerfTool();
tool.initialWorkers=10;
tool.syncFrequency=15;
tool.workerIncrement=0;
tool.workerThinkTime=0;
tool.verbose=false;
tool.incrementDelay=5*1000;
if( args.length > 0 ) {
tool.journalDirectory = new File(args[0]);
}
if( args.length > 1 ) {
tool.workerIncrement = Integer.parseInt(args[1]);
}
if( args.length > 2 ) {
tool.incrementDelay = Long.parseLong(args[2]);
}
if( args.length > 3 ) {
tool.verbose = Boolean.getBoolean(args[3]);
}
if( args.length > 4 ) {
tool.recordSize = Integer.parseInt(args[4]);
}
if( args.length > 5 ) {
tool.syncFrequency = Integer.parseInt(args[5]);
}
if( args.length > 6 ) {
tool.workerThinkTime = Integer.parseInt(args[6]);
}
tool.exec();
}
/**
* @throws IOException
* @see org.apache.activeio.journal.JournalPerfToolSupport#createJournal()
*/
public Journal createJournal() throws IOException {
AsyncDataManager dm = new AsyncDataManager();
dm.setMaxFileLength(logFileSize);
dm.setDirectory(this.journalDirectory);
dm.start();
return new JournalFacade(dm);
}
}

View File

@ -0,0 +1,83 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.File;
import java.io.IOException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalRWPerfToolSupport;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
/**
* A Performance statistics gathering tool for the AsyncDataManager based Journal.
*
* @version $Revision: 1.1 $
*/
public class JournalRWPerfTool extends JournalRWPerfToolSupport {
private int logFileSize = 1024*1024*50;
public static void main(String[] args) throws Exception {
JournalRWPerfTool tool = new JournalRWPerfTool();
tool.initialWriteWorkers=10;
tool.syncFrequency=15;
tool.writeWorkerIncrement=0;
tool.writeWorkerThinkTime=0;
tool.verbose=false;
tool.incrementDelay=5*1000;
if( args.length > 0 ) {
tool.journalDirectory = new File(args[0]);
}
if( args.length > 1 ) {
tool.writeWorkerIncrement = Integer.parseInt(args[1]);
}
if( args.length > 2 ) {
tool.incrementDelay = Long.parseLong(args[2]);
}
if( args.length > 3 ) {
tool.verbose = Boolean.getBoolean(args[3]);
}
if( args.length > 4 ) {
tool.recordSize = Integer.parseInt(args[4]);
}
if( args.length > 5 ) {
tool.syncFrequency = Integer.parseInt(args[5]);
}
if( args.length > 6 ) {
tool.writeWorkerThinkTime = Integer.parseInt(args[6]);
}
tool.exec();
}
/**
* @throws IOException
* @see org.apache.activeio.journal.JournalPerfToolSupport#createJournal()
*/
public Journal createJournal() throws IOException {
AsyncDataManager dm = new AsyncDataManager();
dm.setMaxFileLength(logFileSize);
dm.setDirectory(this.journalDirectory);
dm.start();
return new JournalFacade(dm);
}
}

View File

@ -0,0 +1,62 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.kaha.impl.async.JournalFacade.RecordLocationFacade;
import junit.framework.TestCase;
/**
* Tests the Location Class
*
* @version $Revision: 1.1 $
*/
public class LocationTest extends TestCase {
@SuppressWarnings("unchecked")
synchronized public void testRecordLocationImplComparison() throws IOException {
Location l1 = new Location();
l1.setDataFileId(0);
l1.setOffset(5);
Location l2 = new Location(l1);
l2.setOffset(10);
Location l3 = new Location(l2);
l3.setDataFileId(2);
l3.setOffset(0);
assertTrue( l1.compareTo(l2)<0 );
// Sort them using a list. Put them in the wrong order.
ArrayList<RecordLocationFacade> l = new ArrayList<RecordLocationFacade>();
l.add(new RecordLocationFacade(l2));
l.add(new RecordLocationFacade(l3));
l.add(new RecordLocationFacade(l1));
Collections.sort(l);
// Did they get sorted to the correct order?
System.out.println(l.get(0));
assertSame( l.get(0).getLocation(), l1 );
assertSame( l.get(1).getLocation(), l2 );
assertSame( l.get(2).getLocation(), l3 );
}
}

View File

@ -0,0 +1,176 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import junit.framework.TestCase;
/**
*
* @author chirino
*/
public class LinkedNodeTest extends TestCase {
static class IntLinkedNode extends LinkedNode {
public final int v;
public IntLinkedNode(int v){
this.v = v;
};
@Override
public String toString() {
return ""+v;
}
}
IntLinkedNode i1 = new IntLinkedNode(1);
IntLinkedNode i2 = new IntLinkedNode(2);
IntLinkedNode i3 = new IntLinkedNode(3);
IntLinkedNode i4 = new IntLinkedNode(4);
IntLinkedNode i5 = new IntLinkedNode(5);
IntLinkedNode i6 = new IntLinkedNode(6);
public void testLinkAfter() {
i1.linkAfter(i2.linkAfter(i3));
// Order should be 1,2,3
assertTrue( i1.getNext() == i2 );
assertTrue( i1.getNext().getNext() == i3 );
assertNull( i1.getNext().getNext().getNext() );
assertTrue( i3.getPrevious() == i2 );
assertTrue( i3.getPrevious().getPrevious() == i1 );
assertNull( i3.getPrevious().getPrevious().getPrevious() );
assertTrue( i1.isHeadNode() );
assertFalse(i1.isTailNode() );
assertFalse(i2.isHeadNode() );
assertFalse(i2.isTailNode() );
assertTrue( i3.isTailNode() );
assertFalse(i3.isHeadNode() );
i1.linkAfter(i4.linkAfter(i5));
// Order should be 1,4,5,2,3
assertTrue( i1.getNext() == i4 );
assertTrue( i1.getNext().getNext() == i5 );
assertTrue( i1.getNext().getNext().getNext() == i2 );
assertTrue( i1.getNext().getNext().getNext().getNext() == i3 );
assertNull( i1.getNext().getNext().getNext().getNext().getNext() );
assertTrue( i3.getPrevious() == i2 );
assertTrue( i3.getPrevious().getPrevious() == i5 );
assertTrue( i3.getPrevious().getPrevious().getPrevious() == i4 );
assertTrue( i3.getPrevious().getPrevious().getPrevious().getPrevious() == i1 );
assertNull( i3.getPrevious().getPrevious().getPrevious().getPrevious().getPrevious() );
assertTrue( i1.isHeadNode() );
assertFalse(i1.isTailNode() );
assertFalse(i4.isHeadNode() );
assertFalse(i4.isTailNode() );
assertFalse(i5.isHeadNode() );
assertFalse(i5.isTailNode() );
assertFalse(i2.isHeadNode() );
assertFalse(i2.isTailNode() );
assertTrue( i3.isTailNode() );
assertFalse(i3.isHeadNode() );
}
public void testLinkBefore() {
i3.linkBefore(i2.linkBefore(i1));
assertTrue( i1.getNext() == i2 );
assertTrue( i1.getNext().getNext() == i3 );
assertNull( i1.getNext().getNext().getNext() );
assertTrue( i3.getPrevious() == i2 );
assertTrue( i3.getPrevious().getPrevious() == i1 );
assertNull( i3.getPrevious().getPrevious().getPrevious() );
assertTrue( i1.isHeadNode() );
assertFalse(i1.isTailNode() );
assertFalse(i2.isHeadNode() );
assertFalse(i2.isTailNode() );
assertTrue( i3.isTailNode() );
assertFalse(i3.isHeadNode() );
i2.linkBefore(i5.linkBefore(i4));
// Order should be 1,4,5,2,3
assertTrue( i1.getNext() == i4 );
assertTrue( i1.getNext().getNext() == i5 );
assertTrue( i1.getNext().getNext().getNext() == i2 );
assertTrue( i1.getNext().getNext().getNext().getNext() == i3 );
assertNull( i1.getNext().getNext().getNext().getNext().getNext() );
assertTrue( i3.getPrevious() == i2 );
assertTrue( i3.getPrevious().getPrevious() == i5 );
assertTrue( i3.getPrevious().getPrevious().getPrevious() == i4 );
assertTrue( i3.getPrevious().getPrevious().getPrevious().getPrevious() == i1 );
assertNull( i3.getPrevious().getPrevious().getPrevious().getPrevious().getPrevious() );
assertTrue( i1.isHeadNode() );
assertFalse(i1.isTailNode() );
assertFalse(i4.isHeadNode() );
assertFalse(i4.isTailNode() );
assertFalse(i5.isHeadNode() );
assertFalse(i5.isTailNode() );
assertFalse(i2.isHeadNode() );
assertFalse(i2.isTailNode() );
assertTrue( i3.isTailNode() );
assertFalse(i3.isHeadNode() );
}
public void testUnlink() {
i1.linkAfter(i2.linkAfter(i3));
i3.linkAfter(i4);
i1.linkBefore(i5);
i1.linkAfter(i6);
// Order should be 5,1,6,2,3,4
i4.unlink();
i5.unlink();
i6.unlink();
// Order should be 1,2,3
assertTrue( i1.getNext() == i2 );
assertTrue( i1.getNext().getNext() == i3 );
assertNull( i1.getNext().getNext().getNext() );
assertTrue( i3.getPrevious() == i2 );
assertTrue( i3.getPrevious().getPrevious() == i1 );
assertNull( i3.getPrevious().getPrevious().getPrevious() );
assertTrue( i1.isHeadNode() );
assertFalse(i1.isTailNode() );
assertFalse(i2.isHeadNode() );
assertFalse(i2.isTailNode() );
assertTrue( i3.isTailNode() );
assertFalse(i3.isHeadNode() );
}
}

View File

@ -215,6 +215,12 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activeio-core</artifactId>
<version>3.1-incubator-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activeio-core</artifactId>
<version>3.1-incubator-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>