From d1e7d69fc13e377b846688de82795aa393a259eb Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Tue, 23 Aug 2011 13:34:09 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3466: IndexOutOfBounds in kahadb with large number of subscriptions and pending messages. use long locations such that temp file appends do not overflow and ensure page file overflow does not leave oversized chunk, link pages till overflow fits in a page, + some additional tests git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1160681 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/kahadb/page/PageFile.java | 10 ++-- .../org/apache/kahadb/page/Transaction.java | 54 +++++++++-------- .../apache/kahadb/index/BTreeIndexTest.java | 60 +++++++++++++++++-- 3 files changed, 88 insertions(+), 36 deletions(-) diff --git a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java index 5ba3c147be..61498f10db 100644 --- a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java +++ b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java @@ -140,8 +140,8 @@ public class PageFile { Page page; byte[] current; byte[] diskBound; - int currentLocation = -1; - int diskBoundLocation = -1; + long currentLocation = -1; + long diskBoundLocation = -1; File tmpFile; int length; @@ -150,7 +150,7 @@ public class PageFile { current=data; } - public PageWrite(Page page, int currentLocation, int length, File tmpFile) { + public PageWrite(Page page, long currentLocation, int length, File tmpFile) { this.page = page; this.currentLocation = currentLocation; this.tmpFile = tmpFile; @@ -164,7 +164,7 @@ public class PageFile { diskBoundLocation = -1; } - public void setCurrentLocation(Page page, int location, int length) { + public void setCurrentLocation(Page page, long location, int length) { this.page = page; this.currentLocation = location; this.length = length; @@ -186,7 +186,7 @@ public class PageFile { diskBound = new byte[length]; RandomAccessFile file = new RandomAccessFile(tmpFile, "r"); file.seek(diskBoundLocation); - int readNum = file.read(diskBound); + file.read(diskBound); file.close(); diskBoundLocation = -1; } diff --git a/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java b/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java index e71297501d..93c23b1344 100644 --- a/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java +++ b/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java @@ -30,7 +30,7 @@ public class Transaction implements Iterable { private RandomAccessFile tmpFile; private File txFile; - private int nextLocation = 0; + private long nextLocation = 0; /** * The PageOverflowIOException occurs when a page write is requested @@ -277,36 +277,38 @@ public class Transaction implements Iterable { // If overflow is allowed if (overflow) { - Page next; - if (current.getType() == Page.PAGE_PART_TYPE) { - next = load(current.getNext(), null); - } else { - next = allocate(); - } + do { + Page next; + if (current.getType() == Page.PAGE_PART_TYPE) { + next = load(current.getNext(), null); + } else { + next = allocate(); + } - next.txId = current.txId; + next.txId = current.txId; - // Write the page header - int oldPos = pos; - pos = 0; + // Write the page header + int oldPos = pos; + pos = 0; - current.makePagePart(next.getPageId(), getWriteTransactionId()); - current.write(this); + current.makePagePart(next.getPageId(), getWriteTransactionId()); + current.write(this); - // Do the page write.. - byte[] data = new byte[pageSize]; - System.arraycopy(buf, 0, data, 0, pageSize); - Transaction.this.write(current, data); + // Do the page write.. + byte[] data = new byte[pageSize]; + System.arraycopy(buf, 0, data, 0, pageSize); + Transaction.this.write(current, data); - // Reset for the next page chunk - pos = 0; - // The page header marshalled after the data is written. - skip(Page.PAGE_HEADER_SIZE); - // Move the overflow data after the header. - System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize); - pos += oldPos - pageSize; - current = next; + // Reset for the next page chunk + pos = 0; + // The page header marshalled after the data is written. + skip(Page.PAGE_HEADER_SIZE); + // Move the overflow data after the header. + System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize); + pos += oldPos - pageSize; + current = next; + } while (pos > pageSize); } else { throw new PageOverflowIOException("Page overflow."); } @@ -705,7 +707,7 @@ public class Transaction implements Iterable { if (tmpFile == null) { tmpFile = new RandomAccessFile(getTempFile(), "rw"); } - int location = nextLocation; + long location = nextLocation; tmpFile.seek(nextLocation); tmpFile.write(data); nextLocation = location + data.length; diff --git a/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java b/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java index 880ced6c4c..9074eff7ea 100644 --- a/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java +++ b/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.kahadb.page.PageFile; import org.apache.kahadb.util.LongMarshaller; import org.apache.kahadb.util.StringMarshaller; import org.apache.kahadb.util.VariableMarshaller; @@ -220,8 +221,13 @@ public class BTreeIndexTest extends IndexTestSupport { index.remove(tx, key(1566)); } - public void x_testLargeValue() throws Exception { - createPageFileAndIndex(4*1024); + public void testLargeValue() throws Exception { + //System.setProperty("maxKahaDBTxSize", "" + (1024*1024*1024)); + pf = new PageFile(directory, getClass().getName()); + pf.setPageSize(4*1024); + pf.setEnablePageCaching(false); + pf.load(); + tx = pf.tx(); long id = tx.allocate().getPageId(); tx.commit(); @@ -232,9 +238,9 @@ public class BTreeIndexTest extends IndexTestSupport { tx.commit(); tx = pf.tx(); - String val = new String(new byte[93]); - final long numMessages = 2000; - final int numConsumers = 10000; + String val = new String(new byte[1024]); + final long numMessages = 10; + final int numConsumers = 200; for (long i=0; i hs = new HashSet(); @@ -243,13 +249,57 @@ public class BTreeIndexTest extends IndexTestSupport { } test.put(tx, i, hs); } + tx.commit(); + tx = pf.tx(); + for (long i=0; i hs = new HashSet(); + for (int j=numConsumers; j test = new BTreeIndex(pf, id); + test.setKeyMarshaller(LongMarshaller.INSTANCE); + test.setValueMarshaller(StringMarshaller.INSTANCE); + test.load(tx); + tx.commit(); + + final int stringSize = 6*1024; + tx = pf.tx(); + String val = new String(new byte[stringSize]); + final long numMessages = 1; + + for (long i=0; i= 0; i--) { index.put(tx, key(i), (long)i);