From 4dcf75f48204c830b86bc606522976da53645a04 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Wed, 7 Nov 2012 19:37:24 +0000 Subject: [PATCH] Add files that were part of my .gitignore. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1406770 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/kaha/impl/data/DataFile.java | 124 ++++++ .../activemq/kaha/impl/data/DataItem.java | 101 +++++ .../kaha/impl/data/DataManagerImpl.java | 408 ++++++++++++++++++ .../apache/activemq/kaha/impl/data/Item.java | 30 ++ .../activemq/kaha/impl/data/RedoListener.java | 26 ++ .../kaha/impl/data/SyncDataFileReader.java | 75 ++++ .../kaha/impl/data/SyncDataFileWriter.java | 115 +++++ 7 files changed, 879 insertions(+) create mode 100644 activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java create mode 100755 activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java create mode 100644 activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java create mode 100755 activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java create mode 100644 activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java create mode 100644 activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java create mode 100644 activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java diff --git a/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java new file mode 100644 index 0000000000..62caaf76a4 --- /dev/null +++ b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.data; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; + +/** + * DataFile + * + * + */ +class DataFile { + + private final File file; + private final Integer number; + private int referenceCount; + private RandomAccessFile randomAcessFile; + private Object writerData; + private long length; + private boolean dirty; + + DataFile(File file, int number) { + this.file = file; + this.number = Integer.valueOf(number); + length = file.exists() ? file.length() : 0; + } + + Integer getNumber() { + return number; + } + + synchronized RandomAccessFile getRandomAccessFile() throws FileNotFoundException { + if (randomAcessFile == null) { + randomAcessFile = new RandomAccessFile(file, "rw"); + } + return randomAcessFile; + } + + synchronized long getLength() { + return length; + } + + synchronized void incrementLength(int size) { + length += size; + } + + synchronized void purge() throws IOException { + if (randomAcessFile != null) { + randomAcessFile.close(); + randomAcessFile = null; + } + } + + synchronized boolean delete() throws IOException { + purge(); + return file.delete(); + } + + synchronized void close() throws IOException { + if (randomAcessFile != null) { + randomAcessFile.close(); + } + } + + synchronized int increment() { + return ++referenceCount; + } + + synchronized int decrement() { + return --referenceCount; + } + + synchronized boolean isUnused() { + return referenceCount <= 0; + } + + public String toString() { + String result = file.getName() + " number = " + number + " , length = " + length + " refCount = " + referenceCount; + return result; + } + + /** + * @return Opaque data that a DataFileWriter may want to associate with the + * DataFile. + */ + public synchronized Object getWriterData() { + return writerData; + } + + /** + * @param writerData - Opaque data that a DataFileWriter may want to + * associate with the DataFile. + */ + public synchronized void setWriterData(Object writerData) { + this.writerData = writerData; + dirty = true; + } + + public synchronized boolean isDirty() { + return dirty; + } + + public synchronized void setDirty(boolean value) { + this.dirty = value; + } + +} diff --git a/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java new file mode 100755 index 0000000000..0970da9c33 --- /dev/null +++ b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.data; + +import org.apache.activemq.kaha.StoreLocation; + +/** + * A a wrapper for a data in the store + * + * + */ +public final class DataItem implements Item, StoreLocation { + + private int file = (int)POSITION_NOT_SET; + private long offset = POSITION_NOT_SET; + private int size; + + public DataItem() { + } + + DataItem(DataItem item) { + this.file = item.file; + this.offset = item.offset; + this.size = item.size; + } + + boolean isValid() { + return file != POSITION_NOT_SET; + } + + /** + * @return + * @see org.apache.activemq.kaha.StoreLocation#getSize() + */ + public int getSize() { + return size; + } + + /** + * @param size The size to set. + */ + public void setSize(int size) { + this.size = size; + } + + /** + * @return + * @see org.apache.activemq.kaha.StoreLocation#getOffset() + */ + public long getOffset() { + return offset; + } + + /** + * @param offset The offset to set. + */ + public void setOffset(long offset) { + this.offset = offset; + } + + /** + * @return + * @see org.apache.activemq.kaha.StoreLocation#getFile() + */ + public int getFile() { + return file; + } + + /** + * @param file The file to set. + */ + public void setFile(int file) { + this.file = file; + } + + /** + * @return a pretty print + */ + public String toString() { + String result = "offset = " + offset + ", file = " + file + ", size = " + size; + return result; + } + + public DataItem copy() { + return new DataItem(this); + } +} diff --git a/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java new file mode 100644 index 0000000000..848b39f165 --- /dev/null +++ b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.data; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.kaha.Marshaller; +import org.apache.activemq.kaha.StoreLocation; +import org.apache.activemq.kaha.impl.DataManager; +import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IOHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages DataFiles + * + * + */ +public final class DataManagerImpl implements DataManager { + + public static final int ITEM_HEAD_SIZE = 5; // type + length + public static final byte DATA_ITEM_TYPE = 1; + public static final byte REDO_ITEM_TYPE = 2; + public static final long MAX_FILE_LENGTH = 1024 * 1024 * 32; + + private static final Logger LOG = LoggerFactory.getLogger(DataManagerImpl.class); + private static final String NAME_PREFIX = "data-"; + + private final File directory; + private final String name; + private SyncDataFileReader reader; + private SyncDataFileWriter writer; + private DataFile currentWriteFile; + private long maxFileLength = MAX_FILE_LENGTH; + private Map fileMap = new HashMap(); + private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER; + private String dataFilePrefix; + private final AtomicLong storeSize; + + public DataManagerImpl(File dir, final String name,AtomicLong storeSize) { + this.directory = dir; + this.name = name; + this.storeSize=storeSize; + + dataFilePrefix = IOHelper.toFileSystemSafeName(NAME_PREFIX + name + "-"); + // build up list of current dataFiles + File[] files = dir.listFiles(new FilenameFilter() { + public boolean accept(File dir, String n) { + return dir.equals(directory) && n.startsWith(dataFilePrefix); + } + }); + if (files != null) { + for (int i = 0; i < files.length; i++) { + File file = files[i]; + String n = file.getName(); + String numStr = n.substring(dataFilePrefix.length(), n.length()); + int num = Integer.parseInt(numStr); + DataFile dataFile = new DataFile(file, num); + storeSize.addAndGet(dataFile.getLength()); + fileMap.put(dataFile.getNumber(), dataFile); + if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) { + currentWriteFile = dataFile; + } + } + } + } + + private DataFile createAndAddDataFile(int num) { + String fileName = dataFilePrefix + num; + File file = new File(directory, fileName); + DataFile result = new DataFile(file, num); + fileMap.put(result.getNumber(), result); + return result; + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#getName() + */ + public String getName() { + return name; + } + + synchronized DataFile findSpaceForData(DataItem item) throws IOException { + if (currentWriteFile == null || ((currentWriteFile.getLength() + item.getSize()) > maxFileLength)) { + int nextNum = currentWriteFile != null ? currentWriteFile.getNumber().intValue() + 1 : 1; + if (currentWriteFile != null && currentWriteFile.isUnused()) { + removeDataFile(currentWriteFile); + } + currentWriteFile = createAndAddDataFile(nextNum); + } + item.setOffset(currentWriteFile.getLength()); + item.setFile(currentWriteFile.getNumber().intValue()); + int len = item.getSize() + ITEM_HEAD_SIZE; + currentWriteFile.incrementLength(len); + storeSize.addAndGet(len); + return currentWriteFile; + } + + DataFile getDataFile(StoreLocation item) throws IOException { + Integer key = Integer.valueOf(item.getFile()); + DataFile dataFile = fileMap.get(key); + if (dataFile == null) { + LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); + throw new IOException("Could not locate data file " + NAME_PREFIX + name + "-" + item.getFile()); + } + return dataFile; + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller, + * org.apache.activemq.kaha.StoreLocation) + */ + public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException { + return getReader().readItem(marshaller, item); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller, + * java.lang.Object) + */ + public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException { + return getWriter().storeItem(marshaller, payload, DATA_ITEM_TYPE); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object) + */ + public synchronized StoreLocation storeRedoItem(Object payload) throws IOException { + return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation, + * org.apache.activemq.kaha.Marshaller, java.lang.Object) + */ + public synchronized void updateItem(StoreLocation location, Marshaller marshaller, Object payload) + throws IOException { + getWriter().updateItem((DataItem)location, marshaller, payload, DATA_ITEM_TYPE); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener) + */ + public synchronized void recoverRedoItems(RedoListener listener) throws IOException { + + // Nothing to recover if there is no current file. + if (currentWriteFile == null) { + return; + } + + DataItem item = new DataItem(); + item.setFile(currentWriteFile.getNumber().intValue()); + item.setOffset(0); + while (true) { + byte type; + try { + type = getReader().readDataItemSize(item); + } catch (IOException ignore) { + LOG.trace("End of data file reached at (header was invalid): " + item); + return; + } + if (type == REDO_ITEM_TYPE) { + // Un-marshal the redo item + Object object; + try { + object = readItem(redoMarshaller, item); + } catch (IOException e1) { + LOG.trace("End of data file reached at (payload was invalid): " + item); + return; + } + try { + + listener.onRedoItem(item, object); + // in case the listener is holding on to item references, + // copy it + // so we don't change it behind the listener's back. + item = item.copy(); + + } catch (Exception e) { + throw IOExceptionSupport.create("Recovery handler failed: " + e, e); + } + } + // Move to the next item. + item.setOffset(item.getOffset() + ITEM_HEAD_SIZE + item.getSize()); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#close() + */ + public synchronized void close() throws IOException { + getWriter().close(); + for (Iterator i = fileMap.values().iterator(); i.hasNext();) { + DataFile dataFile = i.next(); + getWriter().force(dataFile); + dataFile.close(); + } + fileMap.clear(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#force() + */ + public synchronized void force() throws IOException { + for (Iterator i = fileMap.values().iterator(); i.hasNext();) { + DataFile dataFile = i.next(); + getWriter().force(dataFile); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#delete() + */ + public synchronized boolean delete() throws IOException { + boolean result = true; + for (Iterator i = fileMap.values().iterator(); i.hasNext();) { + DataFile dataFile = i.next(); + storeSize.addAndGet(-dataFile.getLength()); + result &= dataFile.delete(); + } + fileMap.clear(); + return result; + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int) + */ + public synchronized void addInterestInFile(int file) throws IOException { + if (file >= 0) { + Integer key = Integer.valueOf(file); + DataFile dataFile = fileMap.get(key); + if (dataFile == null) { + dataFile = createAndAddDataFile(file); + } + addInterestInFile(dataFile); + } + } + + synchronized void addInterestInFile(DataFile dataFile) { + if (dataFile != null) { + dataFile.increment(); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int) + */ + public synchronized void removeInterestInFile(int file) throws IOException { + if (file >= 0) { + Integer key = Integer.valueOf(file); + DataFile dataFile = fileMap.get(key); + removeInterestInFile(dataFile); + } + } + + synchronized void removeInterestInFile(DataFile dataFile) throws IOException { + if (dataFile != null) { + + if (dataFile.decrement() <= 0) { + if (dataFile != currentWriteFile) { + removeDataFile(dataFile); + } + } + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles() + */ + public synchronized void consolidateDataFiles() throws IOException { + List purgeList = new ArrayList(); + for (Iterator i = fileMap.values().iterator(); i.hasNext();) { + DataFile dataFile = i.next(); + if (dataFile.isUnused() && dataFile != currentWriteFile) { + purgeList.add(dataFile); + } + } + for (int i = 0; i < purgeList.size(); i++) { + DataFile dataFile = purgeList.get(i); + removeDataFile(dataFile); + } + } + + private void removeDataFile(DataFile dataFile) throws IOException { + fileMap.remove(dataFile.getNumber()); + if (writer != null) { + writer.force(dataFile); + } + storeSize.addAndGet(-dataFile.getLength()); + boolean result = dataFile.delete(); + LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed")); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller() + */ + public Marshaller getRedoMarshaller() { + return redoMarshaller; + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller) + */ + public void setRedoMarshaller(Marshaller redoMarshaller) { + this.redoMarshaller = redoMarshaller; + } + + /** + * @return the maxFileLength + */ + public long getMaxFileLength() { + return maxFileLength; + } + + /** + * @param maxFileLength the maxFileLength to set + */ + public void setMaxFileLength(long maxFileLength) { + this.maxFileLength = maxFileLength; + } + + public String toString() { + return "DataManager:(" + NAME_PREFIX + name + ")"; + } + + public synchronized SyncDataFileReader getReader() { + if (reader == null) { + reader = createReader(); + } + return reader; + } + + protected synchronized SyncDataFileReader createReader() { + return new SyncDataFileReader(this); + } + + public synchronized void setReader(SyncDataFileReader reader) { + this.reader = reader; + } + + public synchronized SyncDataFileWriter getWriter() { + if (writer == null) { + writer = createWriter(); + } + return writer; + } + + private SyncDataFileWriter createWriter() { + return new SyncDataFileWriter(this); + } + + public synchronized void setWriter(SyncDataFileWriter writer) { + this.writer = writer; + } + +} diff --git a/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java new file mode 100755 index 0000000000..ab668e4f83 --- /dev/null +++ b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.data; + +/** + * A a wrapper for a data in the store + * + * + */ +public interface Item { + long POSITION_NOT_SET = -1; + short MAGIC = 31317; + int ACTIVE = 22; + int FREE = 33; + int LOCATION_SIZE = 24; +} diff --git a/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java new file mode 100644 index 0000000000..970fbd5865 --- /dev/null +++ b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.data; + +import org.apache.activemq.kaha.StoreLocation; + + +public interface RedoListener { + + void onRedoItem(StoreLocation item, Object object) throws Exception; + +} diff --git a/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java new file mode 100644 index 0000000000..8371a8d325 --- /dev/null +++ b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.data; + +import java.io.IOException; +import java.io.RandomAccessFile; +import org.apache.activemq.kaha.Marshaller; +import org.apache.activemq.kaha.StoreLocation; +import org.apache.activemq.util.DataByteArrayInputStream; + +/** + * Optimized Store reader + * + * + */ +public final class SyncDataFileReader { + + private DataManagerImpl dataManager; + private DataByteArrayInputStream dataIn; + + /** + * Construct a Store reader + * + * @param fileId + */ + SyncDataFileReader(DataManagerImpl fileManager) { + this.dataManager = fileManager; + this.dataIn = new DataByteArrayInputStream(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem) + */ + public synchronized byte readDataItemSize(DataItem item) throws IOException { + RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile(); + file.seek(item.getOffset()); // jump to the size field + byte rc = file.readByte(); + item.setSize(file.readInt()); + return rc; + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller, + * org.apache.activemq.kaha.StoreLocation) + */ + public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException { + RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile(); + + // TODO: we could reuse the buffer in dataIn if it's big enough to avoid + // allocating byte[] arrays on every readItem. + byte[] data = new byte[item.getSize()]; + file.seek(item.getOffset() + DataManagerImpl.ITEM_HEAD_SIZE); + file.readFully(data); + dataIn.restart(data); + return marshaller.readPayload(dataIn); + } +} diff --git a/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java new file mode 100644 index 0000000000..ca39ea54a9 --- /dev/null +++ b/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.data; + +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.apache.activemq.kaha.Marshaller; +import org.apache.activemq.util.DataByteArrayOutputStream; + +/** + * Optimized Store writer. Synchronously marshalls and writes to the data file. + * Simple but may introduce a bit of contention when put under load. + * + * + */ +public final class SyncDataFileWriter { + + private DataByteArrayOutputStream buffer; + private DataManagerImpl dataManager; + + /** + * Construct a Store writer + * + * @param fileId + */ + SyncDataFileWriter(DataManagerImpl fileManager) { + this.dataManager = fileManager; + this.buffer = new DataByteArrayOutputStream(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller, + * java.lang.Object, byte) + */ + public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type) + throws IOException { + + // Write the packet our internal buffer. + buffer.reset(); + buffer.position(DataManagerImpl.ITEM_HEAD_SIZE); + marshaller.writePayload(payload, buffer); + int size = buffer.size(); + int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE; + buffer.reset(); + buffer.writeByte(type); + buffer.writeInt(payloadSize); + + // Find the position where this item will land at. + DataItem item = new DataItem(); + item.setSize(payloadSize); + DataFile dataFile = dataManager.findSpaceForData(item); + + // Now splat the buffer to the file. + dataFile.getRandomAccessFile().seek(item.getOffset()); + dataFile.getRandomAccessFile().write(buffer.getData(), 0, size); + dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker.. + + dataManager.addInterestInFile(dataFile); + return item; + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation, + * org.apache.activemq.kaha.Marshaller, java.lang.Object, byte) + */ + public synchronized void updateItem(DataItem item, Marshaller marshaller, Object payload, byte type) + throws IOException { + // Write the packet our internal buffer. + buffer.reset(); + buffer.position(DataManagerImpl.ITEM_HEAD_SIZE); + marshaller.writePayload(payload, buffer); + int size = buffer.size(); + int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE; + buffer.reset(); + buffer.writeByte(type); + buffer.writeInt(payloadSize); + item.setSize(payloadSize); + DataFile dataFile = dataManager.getDataFile(item); + RandomAccessFile file = dataFile.getRandomAccessFile(); + file.seek(item.getOffset()); + file.write(buffer.getData(), 0, size); + dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker.. + } + + public synchronized void force(DataFile dataFile) throws IOException { + // If our dirty marker was set.. then we need to sync + if (dataFile.getWriterData() != null && dataFile.isDirty()) { + dataFile.getRandomAccessFile().getFD().sync(); + dataFile.setWriterData(null); + dataFile.setDirty(false); + } + } + + public void close() throws IOException { + } +}