When slicing a releasable bytes reference we would create a new counter every time and pass the original reference chain to the new slice on every slice invocation. This would lead to extremely deep reference chains and needlessly uses a dedicated counter for every slice when all the slices eventually just refer to the same underlying bytes and `Releasable`. This commit tracks the ref count wrapper with its releasable in a separate object that can be passed around on every slicing, making the slices' tree as flat as the original releasable bytes reference. Also, we were needlessly creating a redundant releasable bytes reference from a releasable bytes-stream-output that we never actually used for releasing (all code that uses it just releases the stream itself instead).
This commit is contained in:
parent
56401d3f66
commit
5569137ae3
|
@ -23,7 +23,6 @@ import org.apache.lucene.util.BytesRef;
|
||||||
import org.apache.lucene.util.BytesRefIterator;
|
import org.apache.lucene.util.BytesRefIterator;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.lease.Releasable;
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
import org.elasticsearch.common.lease.Releasables;
|
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
|
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
|
||||||
|
@ -34,41 +33,43 @@ import java.io.OutputStream;
|
||||||
* An extension to {@link BytesReference} that requires releasing its content. This
|
* An extension to {@link BytesReference} that requires releasing its content. This
|
||||||
* class exists to make it explicit when a bytes reference needs to be released, and when not.
|
* class exists to make it explicit when a bytes reference needs to be released, and when not.
|
||||||
*/
|
*/
|
||||||
public final class ReleasableBytesReference extends AbstractRefCounted implements Releasable, BytesReference {
|
public final class ReleasableBytesReference implements Releasable, BytesReference {
|
||||||
|
|
||||||
public static final Releasable NO_OP = () -> {};
|
public static final Releasable NO_OP = () -> {};
|
||||||
private final BytesReference delegate;
|
private final BytesReference delegate;
|
||||||
private final Releasable releasable;
|
private final AbstractRefCounted refCounted;
|
||||||
|
|
||||||
public ReleasableBytesReference(BytesReference delegate, Releasable releasable) {
|
public ReleasableBytesReference(BytesReference delegate, Releasable releasable) {
|
||||||
super("bytes-reference");
|
|
||||||
this.delegate = delegate;
|
this.delegate = delegate;
|
||||||
this.releasable = releasable;
|
this.refCounted = new RefCountedReleasable(releasable);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReleasableBytesReference(BytesReference delegate, AbstractRefCounted refCounted) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
this.refCounted = refCounted;
|
||||||
|
refCounted.incRef();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ReleasableBytesReference wrap(BytesReference reference) {
|
public static ReleasableBytesReference wrap(BytesReference reference) {
|
||||||
return new ReleasableBytesReference(reference, NO_OP);
|
return new ReleasableBytesReference(reference, NO_OP);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public int refCount() {
|
||||||
protected void closeInternal() {
|
return refCounted.refCount();
|
||||||
Releasables.close(releasable);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ReleasableBytesReference retain() {
|
public ReleasableBytesReference retain() {
|
||||||
incRef();
|
refCounted.incRef();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ReleasableBytesReference retainedSlice(int from, int length) {
|
public ReleasableBytesReference retainedSlice(int from, int length) {
|
||||||
BytesReference slice = delegate.slice(from, length);
|
return new ReleasableBytesReference(delegate.slice(from, length), refCounted);
|
||||||
incRef();
|
|
||||||
return new ReleasableBytesReference(slice, this);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
decRef();
|
refCounted.decRef();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -150,4 +151,19 @@ public final class ReleasableBytesReference extends AbstractRefCounted implement
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return delegate.hashCode();
|
return delegate.hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final class RefCountedReleasable extends AbstractRefCounted {
|
||||||
|
|
||||||
|
private final Releasable releasable;
|
||||||
|
|
||||||
|
RefCountedReleasable(Releasable releasable) {
|
||||||
|
super("bytes-reference");
|
||||||
|
this.releasable = releasable;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void closeInternal() {
|
||||||
|
releasable.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.common.io.stream;
|
package org.elasticsearch.common.io.stream;
|
||||||
|
|
||||||
import org.elasticsearch.common.bytes.PagedBytesReference;
|
|
||||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||||
import org.elasticsearch.common.lease.Releasable;
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
import org.elasticsearch.common.lease.Releasables;
|
import org.elasticsearch.common.lease.Releasables;
|
||||||
|
@ -50,16 +49,6 @@ public class ReleasableBytesStreamOutput extends BytesStreamOutput
|
||||||
this.releasable = Releasables.releaseOnce(this.bytes);
|
this.releasable = Releasables.releaseOnce(this.bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a {@link Releasable} implementation of a
|
|
||||||
* {@link org.elasticsearch.common.bytes.BytesReference} that represents the current state of
|
|
||||||
* the bytes in the stream.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public ReleasableBytesReference bytes() {
|
|
||||||
return new ReleasableBytesReference(new PagedBytesReference(bytes, count), releasable);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
Releasables.close(releasable);
|
Releasables.close(releasable);
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
|
||||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
@ -531,7 +530,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
out.seek(start);
|
out.seek(start);
|
||||||
out.writeInt(operationSize);
|
out.writeInt(operationSize);
|
||||||
out.seek(end);
|
out.seek(end);
|
||||||
final ReleasableBytesReference bytes = out.bytes();
|
final BytesReference bytes = out.bytes();
|
||||||
try (ReleasableLock ignored = readLock.acquire()) {
|
try (ReleasableLock ignored = readLock.acquire()) {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
if (operation.primaryTerm() > current.getPrimaryTerm()) {
|
if (operation.primaryTerm() > current.getPrimaryTerm()) {
|
||||||
|
@ -1574,8 +1573,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
out.seek(start);
|
out.seek(start);
|
||||||
out.writeInt(operationSize);
|
out.writeInt(operationSize);
|
||||||
out.seek(end);
|
out.seek(end);
|
||||||
ReleasableBytesReference bytes = out.bytes();
|
out.bytes().writeTo(outStream);
|
||||||
bytes.writeTo(outStream);
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
Releasables.close(out);
|
Releasables.close(out);
|
||||||
|
|
|
@ -46,7 +46,7 @@ public class InboundPipeline implements Releasable {
|
||||||
private final InboundAggregator aggregator;
|
private final InboundAggregator aggregator;
|
||||||
private final BiConsumer<TcpChannel, InboundMessage> messageHandler;
|
private final BiConsumer<TcpChannel, InboundMessage> messageHandler;
|
||||||
private Exception uncaughtException;
|
private Exception uncaughtException;
|
||||||
private ArrayDeque<ReleasableBytesReference> pending = new ArrayDeque<>(2);
|
private final ArrayDeque<ReleasableBytesReference> pending = new ArrayDeque<>(2);
|
||||||
private boolean isClosed = false;
|
private boolean isClosed = false;
|
||||||
|
|
||||||
public InboundPipeline(Version version, StatsTracker statsTracker, PageCacheRecycler recycler, LongSupplier relativeTimeInMillis,
|
public InboundPipeline(Version version, StatsTracker statsTracker, PageCacheRecycler recycler, LongSupplier relativeTimeInMillis,
|
||||||
|
|
|
@ -591,7 +591,7 @@ public abstract class AbstractBytesReferenceTestCase extends ESTestCase {
|
||||||
for (int j = crazyStream.size(); j < crazyLength; j++) {
|
for (int j = crazyStream.size(); j < crazyLength; j++) {
|
||||||
crazyStream.writeByte((byte) random().nextInt(1 << 8));
|
crazyStream.writeByte((byte) random().nextInt(1 << 8));
|
||||||
}
|
}
|
||||||
ReleasableBytesReference crazyReference = crazyStream.bytes();
|
BytesReference crazyReference = crazyStream.bytes();
|
||||||
|
|
||||||
assertFalse(crazyReference.compareTo(bytesReference) == 0);
|
assertFalse(crazyReference.compareTo(bytesReference) == 0);
|
||||||
assertEquals(0, crazyReference.slice(offset, length).compareTo(
|
assertEquals(0, crazyReference.slice(offset, length).compareTo(
|
||||||
|
|
Loading…
Reference in New Issue