releasable bytes output + use in transport / translog
create a new releasable bytes output, that can be recycled, and use it in netty and the translog, 2 areas where the recycling will help nicely. Note, opted for statically typed enforced releasble bytes output, to make sure people take the extra care to control when the bytes reference are released. Also, the mock page/array classes were fixed to not take into account potential recycling going during teardown, for example, on a shared cluster ping requests still happen, so recycling happen actively during teardown. closes #5691
This commit is contained in:
parent
a5aafbb04c
commit
d26a956231
|
@ -36,13 +36,17 @@ import java.nio.ByteBuffer;
|
|||
import java.nio.channels.GatheringByteChannel;
|
||||
import java.util.Arrays;
|
||||
|
||||
public final class PagedBytesReference implements BytesReference {
|
||||
/**
|
||||
* A page based bytes reference, internally holding the bytes in a paged
|
||||
* data structure.
|
||||
*/
|
||||
public class PagedBytesReference implements BytesReference {
|
||||
|
||||
private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
|
||||
private static final int NIO_GATHERING_LIMIT = 524288;
|
||||
|
||||
private final BigArrays bigarrays;
|
||||
private final ByteArray bytearray;
|
||||
protected final ByteArray bytearray;
|
||||
private final int offset;
|
||||
private final int length;
|
||||
private int hash = 0;
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.bytes;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
|
||||
/**
|
||||
* A bytes reference that needs to be released once its usage is done.
|
||||
*/
|
||||
public interface ReleasableBytesReference extends BytesReference, Releasable {
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.bytes;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.ByteArray;
|
||||
|
||||
/**
|
||||
* An extension to {@link PagedBytesReference} that requires releasing its content. This
|
||||
* class exists to make it explicit when a bytes reference needs to be released, and when not.
|
||||
*/
|
||||
public class ReleasablePagedBytesReference extends PagedBytesReference implements ReleasableBytesReference {
|
||||
|
||||
public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int length) {
|
||||
super(bigarrays, bytearray, length);
|
||||
}
|
||||
|
||||
public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int from, int length) {
|
||||
super(bigarrays, bytearray, from, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release() throws ElasticsearchException {
|
||||
Releasables.release(bytearray);
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.io;
|
||||
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
|
||||
/**
|
||||
* A bytes stream that requires its bytes to be released once no longer used.
|
||||
*/
|
||||
public interface ReleasableBytesStream extends BytesStream {
|
||||
|
||||
ReleasableBytesReference bytes();
|
||||
}
|
|
@ -33,37 +33,31 @@ import java.io.IOException;
|
|||
*/
|
||||
public class BytesStreamOutput extends StreamOutput implements BytesStream {
|
||||
|
||||
/**
|
||||
* Factory/manager for our ByteArray
|
||||
*/
|
||||
private final BigArrays bigarrays;
|
||||
protected final BigArrays bigarrays;
|
||||
|
||||
protected ByteArray bytes;
|
||||
protected int count;
|
||||
|
||||
/**
|
||||
* The internal list of pages.
|
||||
*/
|
||||
private ByteArray bytes;
|
||||
|
||||
/**
|
||||
* The number of valid bytes in the buffer.
|
||||
*/
|
||||
private int count;
|
||||
|
||||
/**
|
||||
* Create a nonrecycling {@link BytesStreamOutput} with 1 initial page acquired.
|
||||
* Create a non recycling {@link BytesStreamOutput} with 1 initial page acquired.
|
||||
*/
|
||||
public BytesStreamOutput() {
|
||||
this(BigArrays.PAGE_SIZE_IN_BYTES);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a nonrecycling {@link BytesStreamOutput} with enough initial pages acquired
|
||||
* to satisfy the capacity given by {@link expectedSize}.
|
||||
* Create a non recycling {@link BytesStreamOutput} with enough initial pages acquired
|
||||
* to satisfy the capacity given by expected size.
|
||||
*
|
||||
* @param expectedSize the expected maximum size of the stream in bytes.
|
||||
*/
|
||||
public BytesStreamOutput(int expectedSize) {
|
||||
bigarrays = BigArrays.NON_RECYCLING_INSTANCE;
|
||||
bytes = bigarrays.newByteArray(expectedSize);
|
||||
this(expectedSize, BigArrays.NON_RECYCLING_INSTANCE);
|
||||
}
|
||||
|
||||
protected BytesStreamOutput(int expectedSize, BigArrays bigarrays) {
|
||||
this.bigarrays = bigarrays;
|
||||
this.bytes = bigarrays.newByteArray(expectedSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.io.stream;
|
||||
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
|
||||
import org.elasticsearch.common.io.ReleasableBytesStream;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
|
||||
/**
|
||||
* An bytes stream output that allows providing a {@link BigArrays} instance
|
||||
* expecting it to require releasing its content ({@link #bytes()}) once done.
|
||||
* <p/>
|
||||
* Please note, its is the responsibility of the caller to make sure the bytes
|
||||
* reference do not "escape" and are released only once.
|
||||
*/
|
||||
public class ReleasableBytesStreamOutput extends BytesStreamOutput implements ReleasableBytesStream {
|
||||
|
||||
public ReleasableBytesStreamOutput(BigArrays bigarrays) {
|
||||
super(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays);
|
||||
}
|
||||
|
||||
public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigarrays) {
|
||||
super(expectedSize, bigarrays);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReleasableBytesReference bytes() {
|
||||
return new ReleasablePagedBytesReference(bigarrays, bytes, count);
|
||||
}
|
||||
}
|
|
@ -21,12 +21,14 @@ package org.elasticsearch.index.translog.fs;
|
|||
|
||||
import jsr166y.ThreadLocalRandom;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||
|
@ -62,6 +64,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
|||
}
|
||||
|
||||
private final IndexSettingsService indexSettingsService;
|
||||
private final BigArrays bigArrays;
|
||||
|
||||
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||
private final File[] locations;
|
||||
|
@ -79,9 +82,10 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
|||
private final ApplySettings applySettings = new ApplySettings();
|
||||
|
||||
@Inject
|
||||
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, NodeEnvironment nodeEnv) {
|
||||
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, NodeEnvironment nodeEnv, BigArrays bigArrays) {
|
||||
super(shardId, indexSettings);
|
||||
this.indexSettingsService = indexSettingsService;
|
||||
this.bigArrays = bigArrays;
|
||||
File[] shardLocations = nodeEnv.shardLocations(shardId);
|
||||
this.locations = new File[shardLocations.length];
|
||||
for (int i = 0; i < shardLocations.length; i++) {
|
||||
|
@ -101,6 +105,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
|||
this.indexSettingsService = null;
|
||||
this.locations = new File[]{location};
|
||||
FileSystemUtils.mkdirs(location);
|
||||
this.bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
|
||||
|
||||
this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name()));
|
||||
}
|
||||
|
@ -335,8 +340,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
|||
@Override
|
||||
public Location add(Operation operation) throws TranslogException {
|
||||
rwl.readLock().lock();
|
||||
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
|
||||
boolean released = false;
|
||||
try {
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
out.writeInt(0); // marker for the size...
|
||||
TranslogStreams.writeTranslogOperation(out, operation);
|
||||
out.flush();
|
||||
|
@ -345,11 +351,11 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
|||
int size = out.size();
|
||||
out.seek(0);
|
||||
out.writeInt(size - 4);
|
||||
|
||||
|
||||
// seek back to end
|
||||
out.seek(size);
|
||||
|
||||
BytesReference bytes = out.bytes();
|
||||
ReleasableBytesReference bytes = out.bytes();
|
||||
Location location = current.add(bytes);
|
||||
if (syncOnEachOperation) {
|
||||
current.sync();
|
||||
|
@ -362,11 +368,16 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
|||
// ignore
|
||||
}
|
||||
}
|
||||
Releasables.release(bytes);
|
||||
released = true;
|
||||
return location;
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
if (!released) {
|
||||
Releasables.release(out.bytes());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,15 +27,19 @@ import org.elasticsearch.ElasticsearchIllegalStateException;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.math.MathUtils;
|
||||
import org.elasticsearch.common.netty.NettyStaticSetup;
|
||||
import org.elasticsearch.common.netty.OpenChannelsHandler;
|
||||
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.network.NetworkUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -45,6 +49,7 @@ import org.elasticsearch.common.transport.PortsRange;
|
|||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
|
@ -140,6 +145,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
final ByteSizeValue maxCumulationBufferCapacity;
|
||||
final int maxCompositeBufferComponents;
|
||||
|
||||
final BigArrays bigArrays;
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private volatile OpenChannelsHandler serverOpenChannels;
|
||||
|
@ -165,10 +172,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
private final ReadWriteLock globalLock = new ReentrantReadWriteLock();
|
||||
|
||||
@Inject
|
||||
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, Version version) {
|
||||
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.networkService = networkService;
|
||||
this.bigArrays = bigArrays;
|
||||
this.version = version;
|
||||
|
||||
if (settings.getAsBoolean("netty.epollBugWorkaround", false)) {
|
||||
|
@ -547,58 +555,58 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
byte status = 0;
|
||||
status = TransportStatus.setRequest(status);
|
||||
|
||||
BytesStreamOutput bStream = new BytesStreamOutput();
|
||||
bStream.skip(NettyHeader.HEADER_SIZE);
|
||||
StreamOutput stream = bStream;
|
||||
// only compress if asked, and, the request is not bytes, since then only
|
||||
// the header part is compressed, and the "body" can't be extracted as compressed
|
||||
if (options.compress() && (!(request instanceof BytesTransportRequest))) {
|
||||
status = TransportStatus.setCompress(status);
|
||||
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
|
||||
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
|
||||
boolean addedReleaseListener = false;
|
||||
try {
|
||||
bStream.skip(NettyHeader.HEADER_SIZE);
|
||||
StreamOutput stream = bStream;
|
||||
// only compress if asked, and, the request is not bytes, since then only
|
||||
// the header part is compressed, and the "body" can't be extracted as compressed
|
||||
if (options.compress() && (!(request instanceof BytesTransportRequest))) {
|
||||
status = TransportStatus.setCompress(status);
|
||||
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
|
||||
}
|
||||
stream = new HandlesStreamOutput(stream);
|
||||
|
||||
// we pick the smallest of the 2, to support both backward and forward compatibility
|
||||
// note, this is the only place we need to do this, since from here on, we use the serialized version
|
||||
// as the version to use also when the node receiving this request will send the response with
|
||||
Version version = Version.smallest(this.version, node.version());
|
||||
|
||||
stream.setVersion(version);
|
||||
stream.writeString(action);
|
||||
|
||||
ReleasableBytesReference bytes;
|
||||
ChannelBuffer buffer;
|
||||
// it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
|
||||
// that create paged channel buffers, but its tricky to know when to do it (where this option is
|
||||
// more explicit).
|
||||
if (request instanceof BytesTransportRequest) {
|
||||
BytesTransportRequest bRequest = (BytesTransportRequest) request;
|
||||
assert node.version().equals(bRequest.version());
|
||||
bRequest.writeThin(stream);
|
||||
stream.close();
|
||||
bytes = bStream.bytes();
|
||||
ChannelBuffer headerBuffer = bytes.toChannelBuffer();
|
||||
ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
|
||||
// false on gathering, cause gathering causes the NIO layer to combine the buffers into a single direct buffer....
|
||||
buffer = new CompositeChannelBuffer(headerBuffer.order(), ImmutableList.<ChannelBuffer>of(headerBuffer, contentBuffer), false);
|
||||
} else {
|
||||
request.writeTo(stream);
|
||||
stream.close();
|
||||
bytes = bStream.bytes();
|
||||
buffer = bytes.toChannelBuffer();
|
||||
}
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
ChannelFuture future = targetChannel.write(buffer);
|
||||
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
|
||||
future.addListener(listener);
|
||||
addedReleaseListener = true;
|
||||
} finally {
|
||||
if (!addedReleaseListener) {
|
||||
Releasables.release(bStream.bytes());
|
||||
}
|
||||
}
|
||||
stream = new HandlesStreamOutput(stream);
|
||||
|
||||
// we pick the smallest of the 2, to support both backward and forward compatibility
|
||||
// note, this is the only place we need to do this, since from here on, we use the serialized version
|
||||
// as the version to use also when the node receiving this request will send the response with
|
||||
Version version = Version.smallest(this.version, node.version());
|
||||
|
||||
stream.setVersion(version);
|
||||
stream.writeString(action);
|
||||
|
||||
ChannelBuffer buffer;
|
||||
// it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
|
||||
// that create paged channel buffers, but its tricky to know when to do it (where this option is
|
||||
// more explicit).
|
||||
if (request instanceof BytesTransportRequest) {
|
||||
BytesTransportRequest bRequest = (BytesTransportRequest) request;
|
||||
assert node.version().equals(bRequest.version());
|
||||
bRequest.writeThin(stream);
|
||||
stream.close();
|
||||
ChannelBuffer headerBuffer = bStream.bytes().toChannelBuffer();
|
||||
ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
|
||||
// false on gathering, cause gathering causes the NIO layer to combine the buffers into a single direct buffer....
|
||||
buffer = new CompositeChannelBuffer(headerBuffer.order(), ImmutableList.<ChannelBuffer>of(headerBuffer, contentBuffer), false);
|
||||
} else {
|
||||
request.writeTo(stream);
|
||||
stream.close();
|
||||
buffer = bStream.bytes().toChannelBuffer();
|
||||
}
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
targetChannel.write(buffer);
|
||||
|
||||
// We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future
|
||||
// channelFuture.addListener(new ChannelFutureListener() {
|
||||
// @Override public void operationComplete(ChannelFuture future) throws Exception {
|
||||
// if (!future.isSuccess()) {
|
||||
// // maybe add back the retry?
|
||||
// TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
|
||||
// if (handler != null) {
|
||||
// handler.handleException(new RemoteTransportException("Failed write request", new SendRequestTransportException(node, action, future.getCause())));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// });
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,15 +20,19 @@
|
|||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
|
||||
import org.elasticsearch.transport.*;
|
||||
import org.elasticsearch.transport.support.TransportStatus;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.NotSerializableException;
|
||||
|
@ -71,47 +75,69 @@ public class NettyTransportChannel implements TransportChannel {
|
|||
byte status = 0;
|
||||
status = TransportStatus.setResponse(status);
|
||||
|
||||
BytesStreamOutput bStream = new BytesStreamOutput();
|
||||
bStream.skip(NettyHeader.HEADER_SIZE);
|
||||
StreamOutput stream = bStream;
|
||||
if (options.compress()) {
|
||||
status = TransportStatus.setCompress(status);
|
||||
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
|
||||
}
|
||||
stream = new HandlesStreamOutput(stream);
|
||||
stream.setVersion(version);
|
||||
response.writeTo(stream);
|
||||
stream.close();
|
||||
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(transport.bigArrays);
|
||||
boolean addedReleaseListener = false;
|
||||
try {
|
||||
bStream.skip(NettyHeader.HEADER_SIZE);
|
||||
StreamOutput stream = bStream;
|
||||
if (options.compress()) {
|
||||
status = TransportStatus.setCompress(status);
|
||||
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
|
||||
}
|
||||
stream = new HandlesStreamOutput(stream);
|
||||
stream.setVersion(version);
|
||||
response.writeTo(stream);
|
||||
stream.close();
|
||||
|
||||
ChannelBuffer buffer = bStream.bytes().toChannelBuffer();
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
channel.write(buffer);
|
||||
ReleasableBytesReference bytes = bStream.bytes();
|
||||
ChannelBuffer buffer = bytes.toChannelBuffer();
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
ChannelFuture future = channel.write(buffer);
|
||||
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
|
||||
future.addListener(listener);
|
||||
addedReleaseListener = true;
|
||||
} finally {
|
||||
if (!addedReleaseListener) {
|
||||
Releasables.release(bStream.bytes());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(Throwable error) throws IOException {
|
||||
BytesStreamOutput stream = new BytesStreamOutput();
|
||||
ReleasableBytesStreamOutput stream = new ReleasableBytesStreamOutput(transport.bigArrays);
|
||||
boolean addedReleaseListener = false;
|
||||
try {
|
||||
stream.skip(NettyHeader.HEADER_SIZE);
|
||||
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
|
||||
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
|
||||
too.writeObject(tx);
|
||||
too.close();
|
||||
} catch (NotSerializableException e) {
|
||||
stream.reset();
|
||||
stream.skip(NettyHeader.HEADER_SIZE);
|
||||
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error));
|
||||
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
|
||||
too.writeObject(tx);
|
||||
too.close();
|
||||
try {
|
||||
stream.skip(NettyHeader.HEADER_SIZE);
|
||||
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
|
||||
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
|
||||
too.writeObject(tx);
|
||||
too.close();
|
||||
} catch (NotSerializableException e) {
|
||||
stream.reset();
|
||||
stream.skip(NettyHeader.HEADER_SIZE);
|
||||
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error));
|
||||
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
|
||||
too.writeObject(tx);
|
||||
too.close();
|
||||
}
|
||||
|
||||
byte status = 0;
|
||||
status = TransportStatus.setResponse(status);
|
||||
status = TransportStatus.setError(status);
|
||||
|
||||
ReleasableBytesReference bytes = stream.bytes();
|
||||
ChannelBuffer buffer = bytes.toChannelBuffer();
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
ChannelFuture future = channel.write(buffer);
|
||||
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
|
||||
future.addListener(listener);
|
||||
addedReleaseListener = true;
|
||||
} finally {
|
||||
if (!addedReleaseListener) {
|
||||
Releasables.release(stream.bytes());
|
||||
}
|
||||
}
|
||||
|
||||
byte status = 0;
|
||||
status = TransportStatus.setResponse(status);
|
||||
status = TransportStatus.setError(status);
|
||||
|
||||
ChannelBuffer buffer = stream.bytes().toChannelBuffer();
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
channel.write(buffer);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
@ -53,8 +54,8 @@ public class BenchmarkNettyLargeMessages {
|
|||
NetworkService networkService = new NetworkService(settings);
|
||||
|
||||
final ThreadPool threadPool = new ThreadPool();
|
||||
final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool, networkService, Version.CURRENT), threadPool).start();
|
||||
final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool, networkService, Version.CURRENT), threadPool).start();
|
||||
final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool).start();
|
||||
final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool).start();
|
||||
|
||||
final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300), Version.CURRENT);
|
||||
// final DiscoveryNode smallNode = new DiscoveryNode("small", new InetSocketTransportAddress("localhost", 9300));
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
|
@ -50,7 +51,7 @@ public class TransportBenchmark {
|
|||
NETTY {
|
||||
@Override
|
||||
public Transport newTransport(Settings settings, ThreadPool threadPool) {
|
||||
return new NettyTransport(settings, threadPool, new NetworkService(ImmutableSettings.EMPTY), Version.CURRENT);
|
||||
return new NettyTransport(settings, threadPool, new NetworkService(ImmutableSettings.EMPTY), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
package org.elasticsearch.common.util;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.cache.recycler.MockPageCacheRecycler;
|
||||
import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler;
|
||||
import org.elasticsearch.cache.recycler.PageCacheRecycler;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
|
||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||
import org.elasticsearch.node.service.NodeService;
|
||||
|
@ -55,13 +56,13 @@ public class UnicastZenPingTests extends ElasticsearchTestCase {
|
|||
ClusterName clusterName = new ClusterName("test");
|
||||
NetworkService networkService = new NetworkService(settings);
|
||||
|
||||
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, Version.CURRENT);
|
||||
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
|
||||
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
|
||||
final DiscoveryNode nodeA = new DiscoveryNode("UZP_A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
|
||||
|
||||
InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress();
|
||||
|
||||
NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, Version.CURRENT);
|
||||
NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
|
||||
final TransportService transportServiceB = new TransportService(transportB, threadPool).start();
|
||||
final DiscoveryNode nodeB = new DiscoveryNode("UZP_B", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.lucene.util.AbstractRandomizedTest;
|
|||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.TimeUnits;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cache.recycler.MockPageCacheRecycler;
|
||||
import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
|
@ -134,7 +134,7 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
|
|||
}
|
||||
|
||||
@After
|
||||
public void ensureAllPagesReleased() {
|
||||
public void ensureAllPagesReleased() throws Exception {
|
||||
MockPageCacheRecycler.ensureAllPagesAreReleased();
|
||||
}
|
||||
|
||||
|
@ -145,7 +145,7 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
|
|||
}
|
||||
|
||||
@After
|
||||
public void ensureAllArraysReleased() {
|
||||
public void ensureAllArraysReleased() throws Exception {
|
||||
MockBigArrays.ensureAllArraysAreReleased();
|
||||
}
|
||||
|
||||
|
|
|
@ -21,12 +21,17 @@ package org.elasticsearch.test.cache.recycler;
|
|||
|
||||
import com.carrotsearch.randomizedtesting.RandomizedContext;
|
||||
import com.carrotsearch.randomizedtesting.SeedUtils;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.cache.recycler.PageCacheRecycler;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.*;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -56,12 +61,31 @@ public class MockBigArrays extends BigArrays {
|
|||
ACQUIRED_ARRAYS.clear();
|
||||
}
|
||||
|
||||
public static void ensureAllArraysAreReleased() {
|
||||
public static void ensureAllArraysAreReleased() throws Exception {
|
||||
if (DISCARD) {
|
||||
DISCARD = false;
|
||||
} else if (ACQUIRED_ARRAYS.size() > 0) {
|
||||
final Object cause = ACQUIRED_ARRAYS.entrySet().iterator().next().getValue();
|
||||
throw new RuntimeException(ACQUIRED_ARRAYS.size() + " arrays have not been released", cause instanceof Throwable ? (Throwable) cause : null);
|
||||
} else {
|
||||
final Map<Object, Object> masterCopy = Maps.newHashMap(ACQUIRED_ARRAYS);
|
||||
if (masterCopy.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// not empty, we might be executing on a shared cluster that keeps on obtaining
|
||||
// and releasing arrays, lets make sure that after a reasonable timeout, all master
|
||||
// copy (snapshot) have been released
|
||||
boolean success = ElasticsearchTestCase.awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
public boolean apply(Object input) {
|
||||
return Sets.intersection(masterCopy.keySet(), ACQUIRED_ARRAYS.keySet()).isEmpty();
|
||||
}
|
||||
});
|
||||
if (success) {
|
||||
return;
|
||||
}
|
||||
masterCopy.keySet().retainAll(ACQUIRED_ARRAYS.keySet());
|
||||
if (!masterCopy.isEmpty()) {
|
||||
final Object cause = masterCopy.entrySet().iterator().next().getValue();
|
||||
throw new RuntimeException(masterCopy.size() + " arrays have not been released", cause instanceof Throwable ? (Throwable) cause : null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,17 +17,22 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cache.recycler;
|
||||
package org.elasticsearch.test.cache.recycler;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cache.recycler.PageCacheRecycler;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.recycler.Recycler.V;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.elasticsearch.test.TestCluster;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
|
@ -39,10 +44,27 @@ public class MockPageCacheRecycler extends PageCacheRecycler {
|
|||
ACQUIRED_PAGES.clear();
|
||||
}
|
||||
|
||||
public static void ensureAllPagesAreReleased() {
|
||||
if (ACQUIRED_PAGES.size() > 0) {
|
||||
final Throwable t = ACQUIRED_PAGES.entrySet().iterator().next().getValue();
|
||||
throw new RuntimeException(ACQUIRED_PAGES.size() + " pages have not been released", t);
|
||||
public static void ensureAllPagesAreReleased() throws Exception {
|
||||
final Map<Object, Throwable> masterCopy = Maps.newHashMap(ACQUIRED_PAGES);
|
||||
if (masterCopy.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// not empty, we might be executing on a shared cluster that keeps on obtaining
|
||||
// and releasing pages, lets make sure that after a reasonable timeout, all master
|
||||
// copy (snapshot) have been released
|
||||
boolean success = ElasticsearchTestCase.awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
public boolean apply(Object input) {
|
||||
return Sets.intersection(masterCopy.keySet(), ACQUIRED_PAGES.keySet()).isEmpty();
|
||||
}
|
||||
});
|
||||
if (success) {
|
||||
return;
|
||||
}
|
||||
masterCopy.keySet().retainAll(ACQUIRED_PAGES.keySet());
|
||||
if (!masterCopy.isEmpty()) {
|
||||
final Throwable t = masterCopy.entrySet().iterator().next().getValue();
|
||||
throw new RuntimeException(masterCopy.size() + " pages have not been released", t);
|
||||
}
|
||||
}
|
||||
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.test.cache.recycler;
|
||||
|
||||
import org.elasticsearch.cache.recycler.MockPageCacheRecycler;
|
||||
import org.elasticsearch.cache.recycler.PageCacheRecycler;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.common.network.NetworkService;
|
|||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.AbstractSimpleTransportTests;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
|
@ -37,7 +38,7 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
|
|||
int startPort = 11000 + randomIntBetween(0, 255);
|
||||
int endPort = startPort + 10;
|
||||
settings = ImmutableSettings.builder().put(settings).put("transport.tcp.port", startPort + "-" + endPort).build();
|
||||
MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), version), threadPool);
|
||||
MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version), threadPool);
|
||||
transportService.start();
|
||||
return transportService;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue