mirror of https://github.com/apache/activemq.git
allow for batch writes of forwarded acks.
This commit is contained in:
parent
b9334960e0
commit
15405af2e6
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInput;
|
||||
|
@ -111,8 +113,6 @@ import org.apache.activemq.util.ThreadPoolUtils;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
|
||||
|
||||
public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
|
||||
|
||||
protected BrokerService brokerService;
|
||||
|
@ -1981,7 +1981,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
compactionMarker.setRewriteType(COMPACTED_JOURNAL_FILE);
|
||||
|
||||
ByteSequence payload = toByteSequence(compactionMarker);
|
||||
appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
|
||||
appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
|
||||
LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
|
||||
|
||||
Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0));
|
||||
|
@ -1995,7 +1995,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
|
||||
if (command != null && command instanceof KahaRemoveMessageCommand) {
|
||||
payload = toByteSequence(command);
|
||||
Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
|
||||
Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
|
||||
updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue