ARTEMIS-2239 Zero-copy NIO/MAPPED TimedBuffer
NIO/MAPPED journal types can use directly the buffer of TimedBuffer to perform file writes, avoiding an expensive copy + zeroing.
This commit is contained in:
parent
b80a78d885
commit
4da9d84311
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
|
@ -58,7 +59,7 @@ public abstract class AbstractSequentialFile implements SequentialFile {
|
|||
* Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class.
|
||||
* This is the class returned to the factory when the file is being activated.
|
||||
*/
|
||||
protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
|
||||
protected final TimedBufferObserver timedBufferObserver = createTimedBufferObserver();
|
||||
|
||||
/**
|
||||
* @param file
|
||||
|
@ -74,6 +75,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
|
|||
this.factory = factory;
|
||||
}
|
||||
|
||||
protected TimedBufferObserver createTimedBufferObserver() {
|
||||
return new LocalBufferObserver();
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
@Override
|
||||
|
@ -252,43 +257,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
|
|||
return file;
|
||||
}
|
||||
|
||||
private static final class DelegateCallback implements IOCallback {
|
||||
|
||||
final List<IOCallback> delegates;
|
||||
|
||||
private DelegateCallback(final List<IOCallback> delegates) {
|
||||
this.delegates = delegates;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void done() {
|
||||
final int size = delegates.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
try {
|
||||
delegates.get(i).done();
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final int errorCode, final String errorMessage) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("onError" + " code: " + errorCode + " message: " + errorMessage);
|
||||
}
|
||||
|
||||
final int size = delegates.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
try {
|
||||
delegates.get(i).onError(errorCode, errorMessage);
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected ByteBuffer newBuffer(int size, int limit) {
|
||||
size = factory.calculateBlockSize(size);
|
||||
limit = factory.calculateBlockSize(limit);
|
||||
|
@ -301,21 +269,19 @@ public abstract class AbstractSequentialFile implements SequentialFile {
|
|||
protected class LocalBufferObserver implements TimedBufferObserver {
|
||||
|
||||
@Override
|
||||
public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) {
|
||||
buffer.flip();
|
||||
|
||||
if (buffer.limit() == 0) {
|
||||
factory.releaseBuffer(buffer);
|
||||
} else {
|
||||
public void flushBuffer(final ByteBuf byteBuf, final boolean requestedSync, final List<IOCallback> callbacks) {
|
||||
final int bytes = byteBuf.readableBytes();
|
||||
if (bytes > 0) {
|
||||
final ByteBuffer buffer = newBuffer(byteBuf.capacity(), bytes);
|
||||
buffer.limit(bytes);
|
||||
byteBuf.getBytes(byteBuf.readerIndex(), buffer);
|
||||
buffer.flip();
|
||||
writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
|
||||
} else {
|
||||
IOCallback.done(callbacks);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer newBuffer(final int size, final int limit) {
|
||||
return AbstractSequentialFile.this.newBuffer(size, limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRemainingBytes() {
|
||||
if (fileSize - position.get() > Integer.MAX_VALUE) {
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.io;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* It is a utility class to allow several {@link IOCallback}s to be used as one.
|
||||
*/
|
||||
public final class DelegateCallback implements IOCallback {
|
||||
|
||||
private final Collection<? extends IOCallback> delegates;
|
||||
|
||||
/**
|
||||
* It doesn't copy defensively the given {@code delegates}.
|
||||
*
|
||||
* @throws NullPointerException if {@code delegates} is {@code null}
|
||||
*/
|
||||
public DelegateCallback(final Collection<? extends IOCallback> delegates) {
|
||||
Objects.requireNonNull(delegates, "delegates cannot be null!");
|
||||
this.delegates = delegates;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void done() {
|
||||
IOCallback.done(delegates);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final int errorCode, final String errorMessage) {
|
||||
IOCallback.onError(delegates, errorCode, errorMessage);
|
||||
}
|
||||
|
||||
}
|
|
@ -16,6 +16,10 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.io;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
|
||||
/**
|
||||
* The interface used for AIO Callbacks.
|
||||
*/
|
||||
|
@ -32,4 +36,24 @@ public interface IOCallback {
|
|||
* Observation: The whole file will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations
|
||||
*/
|
||||
void onError(int errorCode, String errorMessage);
|
||||
|
||||
static void done(Collection<? extends IOCallback> delegates) {
|
||||
delegates.forEach(callback -> {
|
||||
try {
|
||||
callback.done();
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static void onError(Collection<? extends IOCallback> delegates, int errorCode, final String errorMessage) {
|
||||
delegates.forEach(callback -> {
|
||||
try {
|
||||
callback.onError(errorCode, errorMessage);
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -358,13 +358,7 @@ public final class TimedBuffer extends CriticalComponentImpl {
|
|||
bytesFlushed.addAndGet(pos);
|
||||
}
|
||||
|
||||
final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
|
||||
//bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!!
|
||||
bufferToFlush.limit(pos);
|
||||
//perform memcpy under the hood due to the off heap buffer
|
||||
buffer.getBytes(0, bufferToFlush);
|
||||
|
||||
bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
|
||||
bufferObserver.flushBuffer(buffer.byteBuf(), pendingSync, callbacks);
|
||||
|
||||
stopSpin();
|
||||
|
||||
|
|
|
@ -16,38 +16,22 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.io.buffer;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
|
||||
public interface TimedBufferObserver {
|
||||
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
// Constructors --------------------------------------------------
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
void flushBuffer(ByteBuffer buffer, boolean syncRequested, List<IOCallback> callbacks);
|
||||
/**
|
||||
* It flushes {@link ByteBuf#readableBytes()} of {@code buffer} without changing its reader/writer indexes.<br>
|
||||
* It just use {@code buffer} temporary: it can be reused by the caller right after this call.
|
||||
*/
|
||||
void flushBuffer(ByteBuf buffer, boolean syncRequested, List<IOCallback> callbacks);
|
||||
|
||||
/**
|
||||
* Return the number of remaining bytes that still fit on the observer (file)
|
||||
*/
|
||||
int getRemainingBytes();
|
||||
|
||||
ByteBuffer newBuffer(int size, int limit);
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
||||
// Protected -----------------------------------------------------
|
||||
|
||||
// Private -------------------------------------------------------
|
||||
|
||||
// Inner classes -------------------------------------------------
|
||||
|
||||
}
|
||||
|
|
|
@ -22,8 +22,11 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
||||
import org.apache.activemq.artemis.core.io.DummyCallback;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
|
@ -32,7 +35,6 @@ import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
|
|||
import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
|
||||
final class TimedSequentialFile implements SequentialFile {
|
||||
|
||||
|
@ -239,88 +241,44 @@ final class TimedSequentialFile implements SequentialFile {
|
|||
return this.sequentialFile.getJavaFile();
|
||||
}
|
||||
|
||||
private static void invokeDoneOn(List<? extends IOCallback> callbacks) {
|
||||
final int size = callbacks.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
try {
|
||||
final IOCallback callback = callbacks.get(i);
|
||||
callback.done();
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void invokeOnErrorOn(final int errorCode,
|
||||
final String errorMessage,
|
||||
List<? extends IOCallback> callbacks) {
|
||||
final int size = callbacks.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
try {
|
||||
final IOCallback callback = callbacks.get(i);
|
||||
callback.onError(errorCode, errorMessage);
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final class DelegateCallback implements IOCallback {
|
||||
|
||||
List<IOCallback> delegates;
|
||||
|
||||
private DelegateCallback() {
|
||||
this.delegates = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void done() {
|
||||
invokeDoneOn(delegates);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final int errorCode, final String errorMessage) {
|
||||
invokeOnErrorOn(errorCode, errorMessage, delegates);
|
||||
}
|
||||
}
|
||||
|
||||
private final class LocalBufferObserver implements TimedBufferObserver {
|
||||
|
||||
private final DelegateCallback delegateCallback = new DelegateCallback();
|
||||
|
||||
@Override
|
||||
public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) {
|
||||
buffer.flip();
|
||||
|
||||
if (buffer.limit() == 0) {
|
||||
public void flushBuffer(final ByteBuf byteBuf, final boolean requestedSync, final List<IOCallback> callbacks) {
|
||||
final int bytes = byteBuf.readableBytes();
|
||||
if (bytes > 0) {
|
||||
final boolean releaseBuffer;
|
||||
final ByteBuffer buffer;
|
||||
if (byteBuf.nioBufferCount() == 1) {
|
||||
//any ByteBuffer is fine with the MAPPED journal
|
||||
releaseBuffer = false;
|
||||
buffer = byteBuf.internalNioBuffer(byteBuf.readerIndex(), bytes);
|
||||
} else {
|
||||
//perform the copy on buffer
|
||||
releaseBuffer = true;
|
||||
buffer = factory.newBuffer(byteBuf.capacity());
|
||||
buffer.limit(bytes);
|
||||
byteBuf.getBytes(byteBuf.readerIndex(), buffer);
|
||||
buffer.flip();
|
||||
}
|
||||
try {
|
||||
invokeDoneOn(callbacks);
|
||||
} finally {
|
||||
factory.releaseBuffer(buffer);
|
||||
blockingWriteDirect(buffer, requestedSync, releaseBuffer);
|
||||
IOCallback.done(callbacks);
|
||||
} catch (Throwable t) {
|
||||
final int code;
|
||||
if (t instanceof IOException) {
|
||||
code = ActiveMQExceptionType.IO_ERROR.getCode();
|
||||
factory.onIOError(new ActiveMQIOErrorException(t.getMessage(), t), t.getMessage(), TimedSequentialFile.this.sequentialFile);
|
||||
} else {
|
||||
code = ActiveMQExceptionType.GENERIC_EXCEPTION.getCode();
|
||||
}
|
||||
IOCallback.onError(callbacks, code, t.getMessage());
|
||||
}
|
||||
} else {
|
||||
if (callbacks.isEmpty()) {
|
||||
try {
|
||||
sequentialFile.writeDirect(buffer, requestedSync);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
} else {
|
||||
delegateCallback.delegates = callbacks;
|
||||
try {
|
||||
sequentialFile.writeDirect(buffer, requestedSync, delegateCallback);
|
||||
} finally {
|
||||
delegateCallback.delegates = null;
|
||||
}
|
||||
}
|
||||
IOCallback.done(callbacks);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer newBuffer(final int size, final int limit) {
|
||||
return factory.newBuffer(limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRemainingBytes() {
|
||||
try {
|
||||
|
|
|
@ -22,16 +22,20 @@ import java.nio.ByteBuffer;
|
|||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||
import org.apache.activemq.artemis.core.io.AbstractSequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.DelegateCallback;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
import org.apache.activemq.artemis.utils.Env;
|
||||
|
@ -51,6 +55,11 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
this.maxIO = maxIO;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TimedBufferObserver createTimedBufferObserver() {
|
||||
return new SyncLocalBufferObserver();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int calculateBlockStart(final int position) {
|
||||
return position;
|
||||
|
@ -324,4 +333,30 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
}
|
||||
SequentialFile.appendTo(getFile().toPath(), dstFile.getJavaFile().toPath());
|
||||
}
|
||||
|
||||
private class SyncLocalBufferObserver extends LocalBufferObserver {
|
||||
|
||||
@Override
|
||||
public void flushBuffer(ByteBuf byteBuf, boolean requestedSync, List<IOCallback> callbacks) {
|
||||
//maybe no need to perform any copy
|
||||
final int bytes = byteBuf.readableBytes();
|
||||
if (bytes == 0) {
|
||||
IOCallback.done(callbacks);
|
||||
} else {
|
||||
//enable zero copy case
|
||||
if (byteBuf.nioBufferCount() == 1 && byteBuf.isDirect()) {
|
||||
final ByteBuffer buffer = byteBuf.internalNioBuffer(byteBuf.readerIndex(), bytes);
|
||||
final IOCallback callback = new DelegateCallback(callbacks);
|
||||
try {
|
||||
//no need to pool the buffer and don't care if the NIO buffer got modified
|
||||
internalWrite(buffer, requestedSync, callback, false);
|
||||
} catch (Exception e) {
|
||||
callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage());
|
||||
}
|
||||
} else {
|
||||
super.flushBuffer(byteBuf, requestedSync, callbacks);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.io;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class DelegateCallbackTest {
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void shouldFailWithNullDelegates() {
|
||||
new DelegateCallback(null);
|
||||
}
|
||||
|
||||
private static final class CountingIOCallback implements IOCallback {
|
||||
|
||||
long done = 0;
|
||||
long onError = 0;
|
||||
final boolean fail;
|
||||
|
||||
private CountingIOCallback(boolean fail) {
|
||||
this.fail = fail;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void done() {
|
||||
done++;
|
||||
if (fail) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
onError++;
|
||||
if (fail) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCallDoneOnEachCallback() {
|
||||
final CountingIOCallback countingIOCallback = new CountingIOCallback(false);
|
||||
final DelegateCallback callback = new DelegateCallback(Arrays.asList(countingIOCallback, countingIOCallback));
|
||||
callback.done();
|
||||
Assert.assertEquals(2, countingIOCallback.done);
|
||||
Assert.assertEquals(0, countingIOCallback.onError);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCallOnErrorOnEachCallback() {
|
||||
final CountingIOCallback countingIOCallback = new CountingIOCallback(false);
|
||||
final DelegateCallback callback = new DelegateCallback(Arrays.asList(countingIOCallback, countingIOCallback));
|
||||
callback.onError(0, "not a real error");
|
||||
Assert.assertEquals(0, countingIOCallback.done);
|
||||
Assert.assertEquals(2, countingIOCallback.onError);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCallDoneOnEachCallbackWithExceptions() {
|
||||
final CountingIOCallback countingIOCallback = new CountingIOCallback(true);
|
||||
final DelegateCallback callback = new DelegateCallback(Arrays.asList(countingIOCallback, countingIOCallback));
|
||||
callback.done();
|
||||
Assert.assertEquals(2, countingIOCallback.done);
|
||||
Assert.assertEquals(0, countingIOCallback.onError);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCallOnErrorOnEachCallbackWithExceptions() {
|
||||
final CountingIOCallback countingIOCallback = new CountingIOCallback(true);
|
||||
final DelegateCallback callback = new DelegateCallback(Arrays.asList(countingIOCallback, countingIOCallback));
|
||||
callback.onError(0, "not a real error");
|
||||
Assert.assertEquals(0, countingIOCallback.done);
|
||||
Assert.assertEquals(2, countingIOCallback.onError);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldLogOnDoneForEachExceptions() {
|
||||
AssertionLoggerHandler.startCapture();
|
||||
try {
|
||||
final CountingIOCallback countingIOCallback = new CountingIOCallback(true);
|
||||
final DelegateCallback callback = new DelegateCallback(Collections.singleton(countingIOCallback));
|
||||
callback.done();
|
||||
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ142024"));
|
||||
} finally {
|
||||
AssertionLoggerHandler.stopCapture();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldLogOnErrorForEachExceptions() {
|
||||
AssertionLoggerHandler.startCapture();
|
||||
try {
|
||||
final CountingIOCallback countingIOCallback = new CountingIOCallback(true);
|
||||
final DelegateCallback callback = new DelegateCallback(Collections.singleton(countingIOCallback));
|
||||
callback.onError(0, "not a real error");
|
||||
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ142025"));
|
||||
} finally {
|
||||
AssertionLoggerHandler.stopCapture();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,205 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.io;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue;
|
||||
import org.apache.activemq.artemis.ArtemisConstants;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.journal.Journal;
|
||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
|
||||
/**
|
||||
* To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
|
||||
*/
|
||||
public class JournalTptBenchmark {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final boolean useDefaultIoExecutor = true;
|
||||
final int fileSize = 10 * 1024 * 1024;
|
||||
final boolean dataSync = false;
|
||||
final Type type = Type.Mapped;
|
||||
final int tests = 10;
|
||||
final int warmup = 20_000;
|
||||
final int measurements = 100_000;
|
||||
final int msgSize = 100;
|
||||
final byte[] msgContent = new byte[msgSize];
|
||||
Arrays.fill(msgContent, (byte) 1);
|
||||
final int totalMessages = (measurements * tests + warmup);
|
||||
final File tmpDirectory = new File("./");
|
||||
//using the default configuration when the broker starts!
|
||||
final SequentialFileFactory factory;
|
||||
switch (type) {
|
||||
|
||||
case Mapped:
|
||||
factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null)
|
||||
.setDatasync(dataSync);
|
||||
break;
|
||||
case Nio:
|
||||
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null, null).setDatasync(dataSync);
|
||||
break;
|
||||
case Aio:
|
||||
factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null, null).setDatasync(dataSync);
|
||||
//disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
|
||||
if (!LibaioContext.isLoaded()) {
|
||||
throw new IllegalStateException("lib AIO not loaded!");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("unsupported case");
|
||||
}
|
||||
|
||||
int numFiles = (int) (totalMessages * factory.calculateBlockSize(msgSize)) / fileSize;
|
||||
if (numFiles < 2) {
|
||||
numFiles = 2;
|
||||
}
|
||||
ExecutorService service = null;
|
||||
final Journal journal;
|
||||
if (useDefaultIoExecutor) {
|
||||
journal = new JournalImpl(fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO());
|
||||
journal.start();
|
||||
} else {
|
||||
final ArrayList<MpscArrayQueue<Runnable>> tasks = new ArrayList<>();
|
||||
service = Executors.newSingleThreadExecutor();
|
||||
journal = new JournalImpl(() -> new ArtemisExecutor() {
|
||||
|
||||
private final MpscArrayQueue<Runnable> taskQueue = new MpscArrayQueue<>(1024);
|
||||
|
||||
{
|
||||
tasks.add(taskQueue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
while (!taskQueue.offer(command)) {
|
||||
LockSupport.parkNanos(1L);
|
||||
}
|
||||
}
|
||||
}, fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO(), 0);
|
||||
journal.start();
|
||||
service.execute(() -> {
|
||||
final int size = tasks.size();
|
||||
final int capacity = 1024;
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
for (int i = 0; i < size; i++) {
|
||||
final MpscArrayQueue<Runnable> runnables = tasks.get(i);
|
||||
for (int j = 0; j < capacity; j++) {
|
||||
final Runnable task = runnables.poll();
|
||||
if (task == null) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
task.run();
|
||||
} catch (Throwable t) {
|
||||
System.err.println(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
try {
|
||||
journal.load(new ArrayList<RecordInfo>(), null, null);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
try {
|
||||
final EncodingSupport encodingSupport = new EncodingSupport() {
|
||||
@Override
|
||||
public int getEncodeSize() {
|
||||
return msgSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(ActiveMQBuffer buffer) {
|
||||
final int writerIndex = buffer.writerIndex();
|
||||
buffer.setBytes(writerIndex, msgContent);
|
||||
buffer.writerIndex(writerIndex + msgSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ActiveMQBuffer buffer) {
|
||||
|
||||
}
|
||||
};
|
||||
long id = 1;
|
||||
{
|
||||
final long elapsed = writeMeasurements(id, journal, encodingSupport, warmup);
|
||||
id += warmup;
|
||||
System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec");
|
||||
}
|
||||
for (int t = 0; t < tests; t++) {
|
||||
final long elapsed = writeMeasurements(id, journal, encodingSupport, measurements);
|
||||
System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec");
|
||||
id += warmup;
|
||||
}
|
||||
|
||||
} finally {
|
||||
journal.stop();
|
||||
if (service != null) {
|
||||
service.shutdown();
|
||||
}
|
||||
final File[] fileToDeletes = tmpDirectory.listFiles();
|
||||
System.out.println("Files to deletes" + Arrays.toString(fileToDeletes));
|
||||
Stream.of(fileToDeletes).forEach(File::delete);
|
||||
}
|
||||
}
|
||||
|
||||
private static long writeMeasurements(long id,
|
||||
Journal journal,
|
||||
EncodingSupport encodingSupport,
|
||||
int measurements) throws Exception {
|
||||
System.gc();
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
|
||||
final long start = System.nanoTime();
|
||||
for (int i = 0; i < measurements; i++) {
|
||||
write(id, journal, encodingSupport);
|
||||
id++;
|
||||
}
|
||||
final long elapsed = System.nanoTime() - start;
|
||||
return elapsed;
|
||||
}
|
||||
|
||||
private static void write(long id, Journal journal, EncodingSupport encodingSupport) throws Exception {
|
||||
journal.appendAddRecord(id, (byte) 1, encodingSupport, false);
|
||||
journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true);
|
||||
}
|
||||
|
||||
private enum Type {
|
||||
|
||||
Mapped, Nio, Aio
|
||||
|
||||
}
|
||||
}
|
|
@ -1,196 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.io;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.activemq.artemis.ArtemisConstants;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||
|
||||
/**
|
||||
* To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
|
||||
*/
|
||||
public class SequentialFileTptBenchmark {
|
||||
|
||||
private static final FastWaitIOCallback CALLBACK = new FastWaitIOCallback();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final boolean dataSync = false;
|
||||
final boolean writeSync = true;
|
||||
final Type type = Type.Mapped;
|
||||
final int tests = 10;
|
||||
final int warmup = 20_000;
|
||||
final int measurements = 100_000;
|
||||
final int msgSize = 100;
|
||||
final byte[] msgContent = new byte[msgSize];
|
||||
Arrays.fill(msgContent, (byte) 1);
|
||||
final File tmpDirectory = new File("./");
|
||||
//using the default configuration when the broker starts!
|
||||
final SequentialFileFactory factory;
|
||||
switch (type) {
|
||||
|
||||
case Mapped:
|
||||
final int fileSize = Math.max(msgSize * measurements, msgSize * warmup);
|
||||
factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync);
|
||||
break;
|
||||
case Nio:
|
||||
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null, null).setDatasync(dataSync);
|
||||
break;
|
||||
case Aio:
|
||||
factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null, null).setDatasync(dataSync);
|
||||
//disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
|
||||
if (!LibaioContext.isLoaded()) {
|
||||
throw new IllegalStateException("lib AIO not loaded!");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("unsupported case");
|
||||
}
|
||||
factory.start();
|
||||
try {
|
||||
final EncodingSupport encodingSupport = new EncodingSupport() {
|
||||
@Override
|
||||
public int getEncodeSize() {
|
||||
return msgSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(ActiveMQBuffer buffer) {
|
||||
final int writerIndex = buffer.writerIndex();
|
||||
buffer.setBytes(writerIndex, msgContent);
|
||||
buffer.writerIndex(writerIndex + msgSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ActiveMQBuffer buffer) {
|
||||
|
||||
}
|
||||
};
|
||||
final int alignedMessageSize = factory.calculateBlockSize(msgSize);
|
||||
final long totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup);
|
||||
if (totalFileSize > Integer.MAX_VALUE)
|
||||
throw new IllegalArgumentException("reduce measurements/warmup");
|
||||
final int fileSize = (int) totalFileSize;
|
||||
final SequentialFile sequentialFile = factory.createSequentialFile("seq.dat");
|
||||
sequentialFile.getJavaFile().delete();
|
||||
sequentialFile.getJavaFile().deleteOnExit();
|
||||
sequentialFile.open();
|
||||
final long startZeros = System.nanoTime();
|
||||
sequentialFile.fill(fileSize);
|
||||
final long elapsedZeros = System.nanoTime() - startZeros;
|
||||
System.out.println("Zeroed " + fileSize + " bytes in " + TimeUnit.NANOSECONDS.toMicros(elapsedZeros) + " us");
|
||||
try {
|
||||
{
|
||||
final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, warmup, writeSync);
|
||||
System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec");
|
||||
}
|
||||
for (int t = 0; t < tests; t++) {
|
||||
final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, measurements, writeSync);
|
||||
System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec");
|
||||
}
|
||||
} finally {
|
||||
sequentialFile.close();
|
||||
}
|
||||
} finally {
|
||||
factory.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private static long writeMeasurements(SequentialFileFactory sequentialFileFactory,
|
||||
SequentialFile sequentialFile,
|
||||
EncodingSupport encodingSupport,
|
||||
int measurements,
|
||||
boolean writeSync) throws Exception {
|
||||
//System.gc();
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
sequentialFileFactory.activateBuffer(sequentialFile);
|
||||
sequentialFile.position(0);
|
||||
final long start = System.nanoTime();
|
||||
for (int i = 0; i < measurements; i++) {
|
||||
write(sequentialFile, encodingSupport, writeSync);
|
||||
}
|
||||
sequentialFileFactory.deactivateBuffer();
|
||||
final long elapsed = System.nanoTime() - start;
|
||||
return elapsed;
|
||||
}
|
||||
|
||||
private static void write(SequentialFile sequentialFile,
|
||||
EncodingSupport encodingSupport,
|
||||
boolean sync) throws Exception {
|
||||
//this pattern is necessary to ensure that NIO's TimedBuffer fill flush the buffer and know the real size of it
|
||||
if (sequentialFile.fits(encodingSupport.getEncodeSize())) {
|
||||
CALLBACK.reset();
|
||||
sequentialFile.write(encodingSupport, sync, CALLBACK);
|
||||
CALLBACK.waitCompletion();
|
||||
} else {
|
||||
throw new IllegalStateException("can't happen!");
|
||||
}
|
||||
}
|
||||
|
||||
private enum Type {
|
||||
|
||||
Mapped, Nio, Aio
|
||||
|
||||
}
|
||||
|
||||
private static final class FastWaitIOCallback implements IOCallback {
|
||||
|
||||
private final AtomicBoolean done = new AtomicBoolean(false);
|
||||
private int errorCode = 0;
|
||||
private String errorMessage = null;
|
||||
|
||||
public FastWaitIOCallback reset() {
|
||||
errorCode = 0;
|
||||
errorMessage = null;
|
||||
done.lazySet(false);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void done() {
|
||||
errorCode = 0;
|
||||
errorMessage = null;
|
||||
done.lazySet(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
this.errorCode = errorCode;
|
||||
this.errorMessage = errorMessage;
|
||||
done.lazySet(true);
|
||||
}
|
||||
|
||||
public void waitCompletion() throws InterruptedException, ActiveMQException {
|
||||
while (!done.get()) {
|
||||
}
|
||||
if (errorMessage != null) {
|
||||
throw ActiveMQExceptionType.createException(errorCode, errorMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.core.io.DummyCallback;
|
||||
|
@ -68,19 +69,15 @@ public class TimedBufferTest extends ActiveMQTestBase {
|
|||
class TestObserver implements TimedBufferObserver {
|
||||
|
||||
@Override
|
||||
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
|
||||
public void flushBuffer(final ByteBuf byteBuf, final boolean sync, final List<IOCallback> callbacks) {
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(byteBuf.readableBytes());
|
||||
buffer.limit(byteBuf.readableBytes());
|
||||
byteBuf.getBytes(byteBuf.readerIndex(), buffer);
|
||||
buffer.flip();
|
||||
buffers.add(buffer);
|
||||
flushTimes.incrementAndGet();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
|
||||
*/
|
||||
@Override
|
||||
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
|
||||
return ByteBuffer.allocate(maxSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRemainingBytes() {
|
||||
return 1024 * 1024;
|
||||
|
@ -135,20 +132,15 @@ public class TimedBufferTest extends ActiveMQTestBase {
|
|||
class TestObserver implements TimedBufferObserver {
|
||||
|
||||
@Override
|
||||
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
|
||||
public void flushBuffer(final ByteBuf byteBuf, final boolean sync, final List<IOCallback> callbacks) {
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(byteBuf.readableBytes());
|
||||
buffer.limit(byteBuf.readableBytes());
|
||||
byteBuf.getBytes(byteBuf.readerIndex(), buffer);
|
||||
for (IOCallback callback : callbacks) {
|
||||
callback.done();
|
||||
}
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
|
||||
*/
|
||||
@Override
|
||||
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
|
||||
return ByteBuffer.allocate(maxSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRemainingBytes() {
|
||||
return 1024 * 1024;
|
||||
|
@ -262,10 +254,11 @@ public class TimedBufferTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
|
||||
public void flushBuffer(final ByteBuf byteBuf, final boolean sync, final List<IOCallback> callbacks) {
|
||||
assert sync;
|
||||
assert dummyBuffer == buffer;
|
||||
if (buffer.position() > 0) {
|
||||
dummyBuffer.limit(byteBuf.readableBytes());
|
||||
byteBuf.getBytes(byteBuf.readerIndex(), dummyBuffer);
|
||||
if (dummyBuffer.position() > 0) {
|
||||
dummyBuffer.clear();
|
||||
flushes++;
|
||||
//ask the device to perform a flush
|
||||
|
@ -273,16 +266,6 @@ public class TimedBufferTest extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
|
||||
*/
|
||||
@Override
|
||||
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
|
||||
assert maxSize <= dummyBuffer.capacity();
|
||||
dummyBuffer.limit(minSize);
|
||||
return dummyBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRemainingBytes() {
|
||||
return Integer.MAX_VALUE;
|
||||
|
@ -318,9 +301,10 @@ public class TimedBufferTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
|
||||
public void flushBuffer(final ByteBuf byteBuf, final boolean sync, final List<IOCallback> callbacks) {
|
||||
assert sync;
|
||||
assert dummyBuffer == buffer;
|
||||
dummyBuffer.limit(byteBuf.readableBytes());
|
||||
byteBuf.getBytes(byteBuf.readerIndex(), dummyBuffer);
|
||||
if (dummyBuffer.position() > 0) {
|
||||
dummyBuffer.clear();
|
||||
//emulate the flush time of a blocking device with a precise sleep
|
||||
|
@ -331,16 +315,6 @@ public class TimedBufferTest extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
|
||||
*/
|
||||
@Override
|
||||
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
|
||||
assert maxSize <= dummyBuffer.capacity();
|
||||
dummyBuffer.limit(minSize);
|
||||
return dummyBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRemainingBytes() {
|
||||
return Integer.MAX_VALUE;
|
||||
|
@ -470,19 +444,15 @@ public class TimedBufferTest extends ActiveMQTestBase {
|
|||
class TestObserver implements TimedBufferObserver {
|
||||
|
||||
@Override
|
||||
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
|
||||
public void flushBuffer(final ByteBuf byteBuf, final boolean sync, final List<IOCallback> callbacks) {
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(byteBuf.readableBytes());
|
||||
buffer.limit(byteBuf.readableBytes());
|
||||
byteBuf.getBytes(byteBuf.readerIndex(), buffer);
|
||||
buffer.flip();
|
||||
buffers.add(buffer);
|
||||
flushTimes.incrementAndGet();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
|
||||
*/
|
||||
@Override
|
||||
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
|
||||
return ByteBuffer.allocate(maxSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRemainingBytes() {
|
||||
return 1024 * 1024;
|
||||
|
|
Loading…
Reference in New Issue