From 40ae055f37bab3f0844a5f2ca6449839440db964 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Wed, 22 Jun 2011 12:55:36 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3374 - first stab at fixing long kahadb tx oom problem git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1138442 13f79535-47bb-0310-9956-ffa450edef68 --- .../usecases/DurableUnsubscribeTest.java | 11 +- .../org/apache/activemq/web/WebClient.java | 3 +- .../org/apache/kahadb/index/HashIndex.java | 2 +- .../java/org/apache/kahadb/page/PageFile.java | 218 ++++++++++-------- .../org/apache/kahadb/page/Transaction.java | 74 ++++-- 5 files changed, 191 insertions(+), 117 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java index 6a83016c67..8a39703227 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.usecases; import java.lang.management.ManagementFactory; import javax.jms.Connection; +import javax.jms.MessageProducer; import javax.jms.Session; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -43,7 +44,15 @@ public class DurableUnsubscribeTest extends org.apache.activemq.TestSupport { Destination d = broker.getDestination(topic); assertEquals("Subscription is missing.", 1, d.getConsumers().size()); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(topic); + for (int i = 0; i < 1000; i++) { + producer.send(session.createTextMessage("text")); + } + + Thread.sleep(1000); + session.unsubscribe("SubsId"); session.close(); @@ -92,7 +101,7 @@ public class DurableUnsubscribeTest extends org.apache.activemq.TestSupport { private void createBroker() throws Exception { broker = BrokerFactory.createBroker("broker:(vm://localhost)"); - broker.setPersistent(false); + //broker.setPersistent(false); broker.setUseJmx(true); broker.setBrokerName(getName()); broker.start(); diff --git a/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java b/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java index 5fdbc11013..8cf551c802 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java @@ -61,7 +61,8 @@ import org.slf4j.LoggerFactory; * stored inside a HttpSession TODO controls to prevent DOS attacks with users * requesting many consumers TODO configure consumers with small prefetch. * - * + * + * */ public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable { diff --git a/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java b/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java index 8f89b97724..fc172499ef 100644 --- a/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java +++ b/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java @@ -345,7 +345,7 @@ public class HashIndex implements Index { tx.store(metadata.page, metadataMarshaller, true); calcThresholds(); - LOG.debug("Resizing done. New bins start at: "+metadata.binPageId); + LOG.debug("Resizing done. New bins start at: "+metadata.binPageId); resizeCapacity=0; resizePageId=0; } diff --git a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java index 93343184e8..5b6ce20637 100644 --- a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java +++ b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java @@ -138,15 +138,35 @@ public class PageFile { Page page; byte[] current; byte[] diskBound; + int currentLocation = -1; + int diskBoundLocation = -1; + File tmpFile; + int length; public PageWrite(Page page, byte[] data) { this.page=page; current=data; } + + public PageWrite(Page page, int currentLocation, int length, File tmpFile) { + this.page = page; + this.currentLocation = currentLocation; + this.tmpFile = tmpFile; + this.length = length; + } public void setCurrent(Page page, byte[] data) { this.page=page; current=data; + currentLocation = -1; + diskBoundLocation = -1; + } + + public void setCurrentLocation(Page page, int location, int length) { + this.page = page; + this.currentLocation = location; + this.length = length; + this.current = null; } @Override @@ -158,22 +178,42 @@ public class PageFile { public Page getPage() { return page; } + + public byte[] getDiskBound() throws IOException { + if (diskBound == null && diskBoundLocation != -1) { + diskBound = new byte[length]; + RandomAccessFile file = new RandomAccessFile(tmpFile, "r"); + file.seek(diskBoundLocation); + int readNum = file.read(diskBound); + file.close(); + diskBoundLocation = -1; + } + return diskBound; + } void begin() { - diskBound = current; - current = null; + if (currentLocation != -1) { + diskBoundLocation = currentLocation; + currentLocation = -1; + current = null; + } else { + diskBound = current; + current = null; + currentLocation = -1; + } } /** * @return true if there is no pending writes to do. */ boolean done() { + diskBoundLocation = -1; diskBound=null; - return current == null; + return current == null || currentLocation == -1; } boolean isDone() { - return diskBound == null && current == null; + return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1; } } @@ -470,7 +510,7 @@ public class PageFile { return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX); } - private long toOffset(long pageId) { + public long toOffset(long pageId) { return PAGE_FILE_HEADER_SIZE+(pageId*pageSize); } @@ -823,6 +863,8 @@ public class PageFile { } } + boolean longTx = false; + for (Map.Entry entry : updates) { Long key = entry.getKey(); PageWrite value = entry.getValue(); @@ -830,12 +872,20 @@ public class PageFile { if( write==null ) { writes.put(key, value); } else { - write.setCurrent(value.page, value.current); + if (value.currentLocation != -1) { + write.setCurrentLocation(value.page, value.currentLocation, value.length); + write.tmpFile = value.tmpFile; + longTx = true; + } else { + write.setCurrent(value.page, value.current); + } } } // Once we start approaching capacity, notify the writer to start writing - if( canStartWriteBatch() ) { + // sync immediately for long txs + if( longTx || canStartWriteBatch() ) { + if( enabledWriteThread ) { writes.notify(); } else { @@ -919,115 +969,90 @@ public class PageFile { } } - /** - * - * @return true if there are still pending writes to do. - * @throws InterruptedException - * @throws IOException - */ - private void writeBatch() throws IOException { - - CountDownLatch checkpointLatch; - ArrayList batch; - synchronized( writes ) { + private void writeBatch() throws IOException { + + CountDownLatch checkpointLatch; + ArrayList batch; + synchronized( writes ) { // If there is not enough to write, wait for a notification... batch = new ArrayList(writes.size()); // build a write batch from the current write cache. for (PageWrite write : writes.values()) { batch.add(write); - // Move the current write to the diskBound write, this lets folks update the + // Move the current write to the diskBound write, this lets folks update the // page again without blocking for this write. write.begin(); - if (write.diskBound == null) { + if (write.diskBound == null && write.diskBoundLocation == -1) { batch.remove(write); } } - // Grab on to the existing checkpoint latch cause once we do this write we can + // Grab on to the existing checkpoint latch cause once we do this write we can // release the folks that were waiting for those writes to hit disk. checkpointLatch = this.checkpointLatch; this.checkpointLatch=null; - } - - try { - if (enableRecoveryFile) { + } - // Using Adler-32 instead of CRC-32 because it's much faster and - // it's - // weakness for short messages with few hundred bytes is not a - // factor in this case since we know - // our write batches are going to much larger. - Checksum checksum = new Adler32(); - for (PageWrite w : batch) { - try { - checksum.update(w.diskBound, 0, pageSize); - } catch (Throwable t) { - throw IOExceptionSupport.create( - "Cannot create recovery file. Reason: " + t, t); - } - } + Checksum checksum = new Adler32(); + recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); + for (PageWrite w : batch) { + if (enableRecoveryFile) { + try { + checksum.update(w.getDiskBound(), 0, pageSize); + } catch (Throwable t) { + throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t); + } + recoveryFile.writeLong(w.page.getPageId()); + recoveryFile.write(w.getDiskBound(), 0, pageSize); + } - // Can we shrink the recovery buffer?? - if (recoveryPageCount > recoveryFileMaxPageCount) { - int t = Math.max(recoveryFileMinPageCount, batch.size()); - recoveryFile.setLength(recoveryFileSizeForPages(t)); - } + writeFile.seek(toOffset(w.page.getPageId())); + writeFile.write(w.getDiskBound(), 0, pageSize); + w.done(); + } - // Record the page writes in the recovery buffer. - recoveryFile.seek(0); - // Store the next tx id... - recoveryFile.writeLong(nextTxid.get()); - // Store the checksum for thw write batch so that on recovery we - // know if we have a consistent - // write batch on disk. - recoveryFile.writeLong(checksum.getValue()); - // Write the # of pages that will follow - recoveryFile.writeInt(batch.size()); + try { + if (enableRecoveryFile) { + // Can we shrink the recovery buffer?? + if (recoveryPageCount > recoveryFileMaxPageCount) { + int t = Math.max(recoveryFileMinPageCount, batch.size()); + recoveryFile.setLength(recoveryFileSizeForPages(t)); + } - // Write the pages. - recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); + // Record the page writes in the recovery buffer. + recoveryFile.seek(0); + // Store the next tx id... + recoveryFile.writeLong(nextTxid.get()); + // Store the checksum for thw write batch so that on recovery we + // know if we have a consistent + // write batch on disk. + recoveryFile.writeLong(checksum.getValue()); + // Write the # of pages that will follow + recoveryFile.writeInt(batch.size()); + } - for (PageWrite w : batch) { - recoveryFile.writeLong(w.page.getPageId()); - recoveryFile.write(w.diskBound, 0, pageSize); - } + if (enableDiskSyncs) { + // Sync to make sure recovery buffer writes land on disk.. + recoveryFile.getFD().sync(); + writeFile.getFD().sync(); + } + } finally { + synchronized (writes) { + for (PageWrite w : batch) { + // If there are no more pending writes, then remove it from + // the write cache. + if (w.isDone()) { + writes.remove(w.page.getPageId()); + } + } + } - if (enableDiskSyncs) { - // Sync to make sure recovery buffer writes land on disk.. - recoveryFile.getFD().sync(); - } - - recoveryPageCount = batch.size(); - } - - for (PageWrite w : batch) { - writeFile.seek(toOffset(w.page.getPageId())); - writeFile.write(w.diskBound, 0, pageSize); - w.done(); - } - - // Sync again - if (enableDiskSyncs) { - writeFile.getFD().sync(); - } - - } finally { - synchronized (writes) { - for (PageWrite w : batch) { - // If there are no more pending writes, then remove it from - // the write cache. - if (w.isDone()) { - writes.remove(w.page.getPageId()); - } - } - } - - if( checkpointLatch!=null ) { - checkpointLatch.countDown(); - } - } - } + if (checkpointLatch != null) { + checkpointLatch.countDown(); + } + } + } private long recoveryFileSizeForPages(int pageCount) { return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount); @@ -1135,4 +1160,7 @@ public class PageFile { return getMainPageFile(); } + public File getDirectory() { + return directory; + } } diff --git a/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java b/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java index 73539a8719..dac5d513cb 100644 --- a/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java +++ b/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java @@ -16,22 +16,11 @@ */ package org.apache.kahadb.page; -import java.io.DataInputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.Iterator; -import java.util.NoSuchElementException; +import java.io.*; +import java.util.*; import org.apache.kahadb.page.PageFile.PageWrite; -import org.apache.kahadb.util.ByteSequence; -import org.apache.kahadb.util.DataByteArrayInputStream; -import org.apache.kahadb.util.DataByteArrayOutputStream; -import org.apache.kahadb.util.Marshaller; -import org.apache.kahadb.util.Sequence; -import org.apache.kahadb.util.SequenceSet; +import org.apache.kahadb.util.*; /** * The class used to read/update a PageFile object. Using a transaction allows you to @@ -39,6 +28,11 @@ import org.apache.kahadb.util.SequenceSet; */ public class Transaction implements Iterable { + + private RandomAccessFile tmpFile; + private File txfFile; + private int nextLocation = 0; + /** * The PageOverflowIOException occurs when a page write is requested * and it's data is larger than what would fit into a single page. @@ -91,12 +85,16 @@ public class Transaction implements Iterable { // If this transaction is updating stuff.. this is the tx of private long writeTransactionId=-1; // List of pages that this transaction has modified. - private HashMap writes=new HashMap(); + private TreeMap writes=new TreeMap(); // List of pages allocated in this transaction private final SequenceSet allocateList = new SequenceSet(); // List of pages freed in this transaction private final SequenceSet freeList = new SequenceSet(); + private long maxTransactionSize = 10485760; + + private long size = 0; + Transaction(PageFile pageFile) { this.pageFile = pageFile; } @@ -650,7 +648,16 @@ public class Transaction implements Iterable { allocateList.clear(); writes.clear(); writeTransactionId = -1; + if (tmpFile != null) { + tmpFile.close(); + if (!getTempFile().delete()) { + throw new IOException("Can't delete temporary KahaDB transaction file:" + getTempFile()); + } + tmpFile = null; + txfFile = null; + } } + size = 0; } /** @@ -665,7 +672,16 @@ public class Transaction implements Iterable { allocateList.clear(); writes.clear(); writeTransactionId = -1; + if (tmpFile != null) { + tmpFile.close(); + if (getTempFile().delete()) { + throw new IOException("Can't delete temporary KahaDB transaction file:" + getTempFile()); + } + tmpFile = null; + txfFile = null; + } } + size = 0; } private long getWriteTransactionId() { @@ -675,16 +691,36 @@ public class Transaction implements Iterable { return writeTransactionId; } + + protected File getTempFile() { + if (txfFile == null) { + txfFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName(Long.toString(getWriteTransactionId())) + ".tmp"); + } + return txfFile; + } + /** * Queues up a page write that should get done when commit() gets called. */ @SuppressWarnings("unchecked") private void write(final Page page, byte[] data) throws IOException { Long key = page.getPageId(); - // TODO: if a large update transaction is in progress, we may want to move - // all the current updates to a temp file so that we don't keep using - // up memory. - writes.put(key, new PageWrite(page, data)); + size += data.length; + + PageWrite write; + if (size > maxTransactionSize) { + if (tmpFile == null) { + tmpFile = new RandomAccessFile(getTempFile(), "rw"); + } + int location = nextLocation; + tmpFile.seek(nextLocation); + tmpFile.write(data); + nextLocation = location + data.length; + write = new PageWrite(page, location, data.length, getTempFile()); + } else { + write = new PageWrite(page, data); + } + writes.put(key, write); } /**