From 26fc9bcb251034d183be0b3c5b516163af8ec012 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 16 Dec 2011 23:30:31 +0200 Subject: [PATCH] abstract away the fs translog file to an interface --- .../index/translog/fs/FsTranslog.java | 4 +- .../index/translog/fs/FsTranslogFile.java | 82 ++----------- .../translog/fs/SimpleFsTranslogFile.java | 108 ++++++++++++++++++ .../stress/SingleThreadBulkStress.java | 6 +- 4 files changed, 121 insertions(+), 79 deletions(-) create mode 100644 src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 746c75bcca2..d31f21e9efb 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -149,7 +149,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } } try { - newFile = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); + newFile = new SimpleFsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); } catch (IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } @@ -184,7 +184,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog location = file; } } - this.trans = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); + this.trans = new SimpleFsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); } catch (IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } finally { diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java index 4c4c5c4dccd..b774eba83ae 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java @@ -19,90 +19,26 @@ package org.elasticsearch.index.translog.fs; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogException; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -public class FsTranslogFile { +public interface FsTranslogFile { - private final long id; - private final ShardId shardId; - private final RafReference raf; + long id(); - private final AtomicInteger operationCounter = new AtomicInteger(); + int estimatedNumberOfOperations(); - private final AtomicLong lastPosition = new AtomicLong(0); - private final AtomicLong lastWrittenPosition = new AtomicLong(0); + long translogSizeInBytes(); - private volatile long lastSyncPosition = 0; + Translog.Location add(byte[] data, int from, int size) throws IOException; - public FsTranslogFile(ShardId shardId, long id, RafReference raf) throws IOException { - this.shardId = shardId; - this.id = id; - this.raf = raf; - raf.raf().setLength(0); - } + byte[] read(Translog.Location location) throws IOException; - public long id() { - return this.id; - } + void close(boolean delete); - public int estimatedNumberOfOperations() { - return operationCounter.get(); - } + FsChannelSnapshot snapshot() throws TranslogException; - public long translogSizeInBytes() { - return lastWrittenPosition.get(); - } - - public Translog.Location add(byte[] data, int from, int size) throws IOException { - long position = lastPosition.getAndAdd(size); - raf.channel().write(ByteBuffer.wrap(data, from, size), position); - lastWrittenPosition.getAndAdd(size); - operationCounter.incrementAndGet(); - return new Translog.Location(id, position, size); - } - - public byte[] read(Translog.Location location) throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(location.size); - raf.channel().read(buffer, location.translogLocation); - return buffer.array(); - } - - public void close(boolean delete) { - raf.decreaseRefCount(delete); - } - - /** - * Returns a snapshot on this file, null if it failed to snapshot. - */ - public FsChannelSnapshot snapshot() throws TranslogException { - try { - if (!raf.increaseRefCount()) { - return null; - } - return new FsChannelSnapshot(this.id, raf, lastWrittenPosition.get(), operationCounter.get()); - } catch (Exception e) { - throw new TranslogException(shardId, "Failed to snapshot", e); - } - } - - public void sync() { - try { - // check if we really need to sync here... - long last = lastWrittenPosition.get(); - if (last == lastSyncPosition) { - return; - } - lastSyncPosition = last; - raf.channel().force(false); - } catch (Exception e) { - // ignore - } - } + void sync(); } diff --git a/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java new file mode 100644 index 00000000000..f9a3619f987 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java @@ -0,0 +1,108 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog.fs; + +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class SimpleFsTranslogFile implements FsTranslogFile { + + private final long id; + private final ShardId shardId; + private final RafReference raf; + + private final AtomicInteger operationCounter = new AtomicInteger(); + + private final AtomicLong lastPosition = new AtomicLong(0); + private final AtomicLong lastWrittenPosition = new AtomicLong(0); + + private volatile long lastSyncPosition = 0; + + public SimpleFsTranslogFile(ShardId shardId, long id, RafReference raf) throws IOException { + this.shardId = shardId; + this.id = id; + this.raf = raf; + raf.raf().setLength(0); + } + + public long id() { + return this.id; + } + + public int estimatedNumberOfOperations() { + return operationCounter.get(); + } + + public long translogSizeInBytes() { + return lastWrittenPosition.get(); + } + + public Translog.Location add(byte[] data, int from, int size) throws IOException { + long position = lastPosition.getAndAdd(size); + raf.channel().write(ByteBuffer.wrap(data, from, size), position); + lastWrittenPosition.getAndAdd(size); + operationCounter.incrementAndGet(); + return new Translog.Location(id, position, size); + } + + public byte[] read(Translog.Location location) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(location.size); + raf.channel().read(buffer, location.translogLocation); + return buffer.array(); + } + + public void close(boolean delete) { + raf.decreaseRefCount(delete); + } + + /** + * Returns a snapshot on this file, null if it failed to snapshot. + */ + public FsChannelSnapshot snapshot() throws TranslogException { + try { + if (!raf.increaseRefCount()) { + return null; + } + return new FsChannelSnapshot(this.id, raf, lastWrittenPosition.get(), operationCounter.get()); + } catch (Exception e) { + throw new TranslogException(shardId, "Failed to snapshot", e); + } + } + + public void sync() { + try { + // check if we really need to sync here... + long last = lastWrittenPosition.get(); + if (last == lastSyncPosition) { + return; + } + lastSyncPosition = last; + raf.channel().force(false); + } catch (Exception e) { + // ignore + } + } +} diff --git a/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java b/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java index 5303955fa89..b1c8ee1c545 100644 --- a/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java +++ b/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java @@ -32,7 +32,6 @@ import org.elasticsearch.node.Node; import java.io.IOException; import java.util.Random; -import java.util.concurrent.TimeUnit; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; @@ -50,11 +49,10 @@ public class SingleThreadBulkStress { Random random = new Random(); Settings settings = settingsBuilder() - .put("cluster.routing.schedule", 200, TimeUnit.MILLISECONDS) - .put("index.refresh_interval", "-1") + .put("index.refresh_interval", "1s") .put("index.merge.async", true) .put("index.translog.flush_threshold_ops", 5000) - .put("gateway.type", "local") + .put("gateway.type", "none") .put(SETTING_NUMBER_OF_SHARDS, 1) .put(SETTING_NUMBER_OF_REPLICAS, 1) .build();