mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3466: IndexOutOfBounds in kahadb with large number of subscriptions and pending messages. use long locations such that temp file appends do not overflow and ensure page file overflow does not leave oversized chunk, link pages till overflow fits in a page, + some additional tests
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1160681 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4b6648eba1
commit
d1e7d69fc1
|
@ -140,8 +140,8 @@ public class PageFile {
|
|||
Page page;
|
||||
byte[] current;
|
||||
byte[] diskBound;
|
||||
int currentLocation = -1;
|
||||
int diskBoundLocation = -1;
|
||||
long currentLocation = -1;
|
||||
long diskBoundLocation = -1;
|
||||
File tmpFile;
|
||||
int length;
|
||||
|
||||
|
@ -150,7 +150,7 @@ public class PageFile {
|
|||
current=data;
|
||||
}
|
||||
|
||||
public PageWrite(Page page, int currentLocation, int length, File tmpFile) {
|
||||
public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
|
||||
this.page = page;
|
||||
this.currentLocation = currentLocation;
|
||||
this.tmpFile = tmpFile;
|
||||
|
@ -164,7 +164,7 @@ public class PageFile {
|
|||
diskBoundLocation = -1;
|
||||
}
|
||||
|
||||
public void setCurrentLocation(Page page, int location, int length) {
|
||||
public void setCurrentLocation(Page page, long location, int length) {
|
||||
this.page = page;
|
||||
this.currentLocation = location;
|
||||
this.length = length;
|
||||
|
@ -186,7 +186,7 @@ public class PageFile {
|
|||
diskBound = new byte[length];
|
||||
RandomAccessFile file = new RandomAccessFile(tmpFile, "r");
|
||||
file.seek(diskBoundLocation);
|
||||
int readNum = file.read(diskBound);
|
||||
file.read(diskBound);
|
||||
file.close();
|
||||
diskBoundLocation = -1;
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ public class Transaction implements Iterable<Page> {
|
|||
|
||||
private RandomAccessFile tmpFile;
|
||||
private File txFile;
|
||||
private int nextLocation = 0;
|
||||
private long nextLocation = 0;
|
||||
|
||||
/**
|
||||
* The PageOverflowIOException occurs when a page write is requested
|
||||
|
@ -277,36 +277,38 @@ public class Transaction implements Iterable<Page> {
|
|||
// If overflow is allowed
|
||||
if (overflow) {
|
||||
|
||||
Page next;
|
||||
if (current.getType() == Page.PAGE_PART_TYPE) {
|
||||
next = load(current.getNext(), null);
|
||||
} else {
|
||||
next = allocate();
|
||||
}
|
||||
do {
|
||||
Page next;
|
||||
if (current.getType() == Page.PAGE_PART_TYPE) {
|
||||
next = load(current.getNext(), null);
|
||||
} else {
|
||||
next = allocate();
|
||||
}
|
||||
|
||||
next.txId = current.txId;
|
||||
next.txId = current.txId;
|
||||
|
||||
// Write the page header
|
||||
int oldPos = pos;
|
||||
pos = 0;
|
||||
// Write the page header
|
||||
int oldPos = pos;
|
||||
pos = 0;
|
||||
|
||||
current.makePagePart(next.getPageId(), getWriteTransactionId());
|
||||
current.write(this);
|
||||
current.makePagePart(next.getPageId(), getWriteTransactionId());
|
||||
current.write(this);
|
||||
|
||||
// Do the page write..
|
||||
byte[] data = new byte[pageSize];
|
||||
System.arraycopy(buf, 0, data, 0, pageSize);
|
||||
Transaction.this.write(current, data);
|
||||
// Do the page write..
|
||||
byte[] data = new byte[pageSize];
|
||||
System.arraycopy(buf, 0, data, 0, pageSize);
|
||||
Transaction.this.write(current, data);
|
||||
|
||||
// Reset for the next page chunk
|
||||
pos = 0;
|
||||
// The page header marshalled after the data is written.
|
||||
skip(Page.PAGE_HEADER_SIZE);
|
||||
// Move the overflow data after the header.
|
||||
System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
|
||||
pos += oldPos - pageSize;
|
||||
current = next;
|
||||
// Reset for the next page chunk
|
||||
pos = 0;
|
||||
// The page header marshalled after the data is written.
|
||||
skip(Page.PAGE_HEADER_SIZE);
|
||||
// Move the overflow data after the header.
|
||||
System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
|
||||
pos += oldPos - pageSize;
|
||||
current = next;
|
||||
|
||||
} while (pos > pageSize);
|
||||
} else {
|
||||
throw new PageOverflowIOException("Page overflow.");
|
||||
}
|
||||
|
@ -705,7 +707,7 @@ public class Transaction implements Iterable<Page> {
|
|||
if (tmpFile == null) {
|
||||
tmpFile = new RandomAccessFile(getTempFile(), "rw");
|
||||
}
|
||||
int location = nextLocation;
|
||||
long location = nextLocation;
|
||||
tmpFile.seek(nextLocation);
|
||||
tmpFile.write(data);
|
||||
nextLocation = location + data.length;
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.kahadb.page.PageFile;
|
||||
import org.apache.kahadb.util.LongMarshaller;
|
||||
import org.apache.kahadb.util.StringMarshaller;
|
||||
import org.apache.kahadb.util.VariableMarshaller;
|
||||
|
@ -220,8 +221,13 @@ public class BTreeIndexTest extends IndexTestSupport {
|
|||
index.remove(tx, key(1566));
|
||||
}
|
||||
|
||||
public void x_testLargeValue() throws Exception {
|
||||
createPageFileAndIndex(4*1024);
|
||||
public void testLargeValue() throws Exception {
|
||||
//System.setProperty("maxKahaDBTxSize", "" + (1024*1024*1024));
|
||||
pf = new PageFile(directory, getClass().getName());
|
||||
pf.setPageSize(4*1024);
|
||||
pf.setEnablePageCaching(false);
|
||||
pf.load();
|
||||
tx = pf.tx();
|
||||
long id = tx.allocate().getPageId();
|
||||
tx.commit();
|
||||
|
||||
|
@ -232,9 +238,9 @@ public class BTreeIndexTest extends IndexTestSupport {
|
|||
tx.commit();
|
||||
|
||||
tx = pf.tx();
|
||||
String val = new String(new byte[93]);
|
||||
final long numMessages = 2000;
|
||||
final int numConsumers = 10000;
|
||||
String val = new String(new byte[1024]);
|
||||
final long numMessages = 10;
|
||||
final int numConsumers = 200;
|
||||
|
||||
for (long i=0; i<numMessages; i++) {
|
||||
HashSet<String> hs = new HashSet<String>();
|
||||
|
@ -243,13 +249,57 @@ public class BTreeIndexTest extends IndexTestSupport {
|
|||
}
|
||||
test.put(tx, i, hs);
|
||||
}
|
||||
tx.commit();
|
||||
tx = pf.tx();
|
||||
for (long i=0; i<numMessages; i++) {
|
||||
HashSet<String> hs = new HashSet<String>();
|
||||
for (int j=numConsumers; j<numConsumers*2;j++) {
|
||||
hs.add(val + "SOME TEXT" + j);
|
||||
}
|
||||
test.put(tx, i, hs);
|
||||
}
|
||||
|
||||
tx.commit();
|
||||
tx = pf.tx();
|
||||
for (long i=0; i<numMessages; i++) {
|
||||
test.get(tx, i);
|
||||
}
|
||||
tx.commit();
|
||||
}
|
||||
|
||||
public void testLargeValueOverflow() throws Exception {
|
||||
pf = new PageFile(directory, getClass().getName());
|
||||
pf.setPageSize(4*1024);
|
||||
pf.setEnablePageCaching(false);
|
||||
pf.setWriteBatchSize(1);
|
||||
pf.load();
|
||||
tx = pf.tx();
|
||||
long id = tx.allocate().getPageId();
|
||||
|
||||
BTreeIndex<Long, String> test = new BTreeIndex<Long, String>(pf, id);
|
||||
test.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||
test.setValueMarshaller(StringMarshaller.INSTANCE);
|
||||
test.load(tx);
|
||||
tx.commit();
|
||||
|
||||
final int stringSize = 6*1024;
|
||||
tx = pf.tx();
|
||||
String val = new String(new byte[stringSize]);
|
||||
final long numMessages = 1;
|
||||
|
||||
for (long i=0; i<numMessages; i++) {
|
||||
test.put(tx, i, val);
|
||||
}
|
||||
tx.commit();
|
||||
|
||||
tx = pf.tx();
|
||||
for (long i=0; i<numMessages; i++) {
|
||||
String s = test.get(tx, i);
|
||||
assertEquals("len is as expected", stringSize, s.length());
|
||||
}
|
||||
tx.commit();
|
||||
}
|
||||
|
||||
void doInsertReverse(int count) throws Exception {
|
||||
for (int i = count-1; i >= 0; i--) {
|
||||
index.put(tx, key(i), (long)i);
|
||||
|
|
Loading…
Reference in New Issue