From d1f5577c404c3d6de12f7935dda8b28e29f743d3 Mon Sep 17 00:00:00 2001 From: kimchy Date: Fri, 9 Jul 2010 03:51:36 +0300 Subject: [PATCH] add channel based read from fs snapshot, reusing the same file descriptor with direct position based reads --- .../index/translog/fs/FsChannelSnapshot.java | 121 ++++++++++++++++++ ...{FsSnapshot.java => FsStreamSnapshot.java} | 4 +- .../index/translog/fs/FsTranslog.java | 33 ++++- .../fs/FsChannelSimpleTranslogTests.java | 43 +++++++ ....java => FsStreamSimpleTranslogTests.java} | 4 +- 5 files changed, 194 insertions(+), 11 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java rename modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/{FsSnapshot.java => FsStreamSnapshot.java} (95%) create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/fs/FsChannelSimpleTranslogTests.java rename modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/fs/{FsSimpleTranslogTests.java => FsStreamSimpleTranslogTests.java} (93%) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java new file mode 100644 index 00000000000..084cdf989e9 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java @@ -0,0 +1,121 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog.fs; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogStreams; + +import java.io.FileNotFoundException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +/** + * @author kimchy (shay.banon) + */ +public class FsChannelSnapshot implements Translog.Snapshot { + + private final ShardId shardId; + + private final long id; + + private final RafReference raf; + + private final FileChannel channel; + + private final long length; + + private Translog.Operation lastOperationRead = null; + + private int position = 0; + + private ByteBuffer cacheBuffer; + + public FsChannelSnapshot(ShardId shardId, long id, RafReference raf, long length) throws FileNotFoundException { + this.shardId = shardId; + this.id = id; + this.raf = raf; + this.channel = raf.raf().getChannel(); + this.length = length; + } + + @Override public long translogId() { + return this.id; + } + + @Override public long position() { + return this.position; + } + + @Override public long length() { + return this.length; + } + + @Override public boolean hasNext() { + try { + if (position > length) { + return false; + } + if (cacheBuffer == null) { + cacheBuffer = ByteBuffer.allocate(1024); + } + cacheBuffer.limit(4); + int bytesRead = channel.read(cacheBuffer, position); + if (bytesRead < 4) { + return false; + } + cacheBuffer.flip(); + int opSize = cacheBuffer.getInt(); + position += 4; + if ((position + opSize) > length) { + // restore the position to before we read the opSize + position -= 4; + return false; + } + if (cacheBuffer.capacity() < opSize) { + cacheBuffer = ByteBuffer.allocate(opSize); + } + cacheBuffer.clear(); + cacheBuffer.limit(opSize); + channel.read(cacheBuffer, position); + cacheBuffer.flip(); + position += opSize; + lastOperationRead = TranslogStreams.readTranslogOperation(new BytesStreamInput(cacheBuffer.array(), 0, opSize)); + return true; + } catch (Exception e) { + return false; + } + } + + @Override public Translog.Operation next() { + return this.lastOperationRead; + } + + @Override public void seekForward(long length) { + this.position += length; + } + + @Override public boolean release() throws ElasticSearchException { + raf.decreaseRefCount(); + return true; + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsSnapshot.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java similarity index 95% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsSnapshot.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java index 51b5212d838..fd9e1f4e506 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsSnapshot.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java @@ -34,7 +34,7 @@ import java.io.IOException; /** * @author kimchy (shay.banon) */ -public class FsSnapshot implements Translog.Snapshot { +public class FsStreamSnapshot implements Translog.Snapshot { private final ShardId shardId; @@ -52,7 +52,7 @@ public class FsSnapshot implements Translog.Snapshot { private byte[] cachedData; - public FsSnapshot(ShardId shardId, long id, RafReference raf, long length) throws FileNotFoundException { + public FsStreamSnapshot(ShardId shardId, long id, RafReference raf, long length) throws FileNotFoundException { this.shardId = shardId; this.id = id; this.raf = raf; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index f24b54ccb84..41047c4df62 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -46,6 +46,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog private final File location; + private final boolean useStream; + private final Object mutex = new Object(); private volatile long id; @@ -57,13 +59,21 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog private volatile SoftReference cachedBos = new SoftReference(new FastByteArrayOutputStream()); @Inject public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) { - this(shardId, indexSettings, new File(new File(new File(new File(nodeEnv.nodeFile(), "indices"), shardId.index().name()), Integer.toString(shardId.id())), "translog")); + super(shardId, indexSettings); + this.location = new File(new File(new File(new File(nodeEnv.nodeFile(), "indices"), shardId.index().name()), Integer.toString(shardId.id())), "translog"); + this.location.mkdirs(); + this.useStream = componentSettings.getAsBoolean("use_stream", false); } public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File location) { + this(shardId, indexSettings, location, false); + } + + public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File location, boolean useStream) { super(shardId, indexSettings); this.location = location; - location.mkdirs(); + this.location.mkdirs(); + this.useStream = useStream; } @Override public long currentId() { @@ -119,7 +129,11 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog synchronized (mutex) { try { raf.increaseRefCount(); - return new FsSnapshot(shardId, this.id, raf, raf.raf().getFilePointer()); + if (useStream) { + return new FsStreamSnapshot(shardId, this.id, raf, raf.raf().getFilePointer()); + } else { + return new FsChannelSnapshot(shardId, this.id, raf, raf.raf().getFilePointer()); + } } catch (IOException e) { throw new TranslogException(shardId, "Failed to snapshot", e); } @@ -132,11 +146,16 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog return snapshot(); } try { - FsSnapshot fsSnapshot = (FsSnapshot) snapshot; raf.increaseRefCount(); - FsSnapshot newSnapshot = new FsSnapshot(shardId, id, raf, raf.raf().getFilePointer()); - newSnapshot.seekForward(fsSnapshot.position()); - return newSnapshot; + if (useStream) { + FsStreamSnapshot newSnapshot = new FsStreamSnapshot(shardId, id, raf, raf.raf().getFilePointer()); + newSnapshot.seekForward(snapshot.position()); + return newSnapshot; + } else { + FsChannelSnapshot newSnapshot = new FsChannelSnapshot(shardId, id, raf, raf.raf().getFilePointer()); + newSnapshot.seekForward(snapshot.position()); + return newSnapshot; + } } catch (IOException e) { throw new TranslogException(shardId, "Failed to snapshot", e); } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/fs/FsChannelSimpleTranslogTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/fs/FsChannelSimpleTranslogTests.java new file mode 100644 index 00000000000..0c01c759600 --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/fs/FsChannelSimpleTranslogTests.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog.fs; + +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.index.translog.AbstractSimpleTranslogTests; +import org.elasticsearch.index.translog.Translog; +import org.testng.annotations.AfterTest; + +import java.io.File; + +import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; + +/** + * @author kimchy (shay.banon) + */ +public class FsChannelSimpleTranslogTests extends AbstractSimpleTranslogTests { + + @Override protected Translog create() { + return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false); + } + + @AfterTest public void cleanup() { + FileSystemUtils.deleteRecursively(new File("work/fs-translog"), true); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/fs/FsSimpleTranslogTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/fs/FsStreamSimpleTranslogTests.java similarity index 93% rename from modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/fs/FsSimpleTranslogTests.java rename to modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/fs/FsStreamSimpleTranslogTests.java index 3c5f6a48fb0..7917477a85b 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/fs/FsSimpleTranslogTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/fs/FsStreamSimpleTranslogTests.java @@ -31,10 +31,10 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; /** * @author kimchy (shay.banon) */ -public class FsSimpleTranslogTests extends AbstractSimpleTranslogTests { +public class FsStreamSimpleTranslogTests extends AbstractSimpleTranslogTests { @Override protected Translog create() { - return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog")); + return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), true); } @AfterTest public void cleanup() {