abstract away the fs translog file to an interface
This commit is contained in:
parent
9c1280f6eb
commit
26fc9bcb25
|
@ -149,7 +149,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
|||
}
|
||||
}
|
||||
try {
|
||||
newFile = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id)));
|
||||
newFile = new SimpleFsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id)));
|
||||
} catch (IOException e) {
|
||||
throw new TranslogException(shardId, "failed to create new translog file", e);
|
||||
}
|
||||
|
@ -184,7 +184,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
|||
location = file;
|
||||
}
|
||||
}
|
||||
this.trans = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id)));
|
||||
this.trans = new SimpleFsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id)));
|
||||
} catch (IOException e) {
|
||||
throw new TranslogException(shardId, "failed to create new translog file", e);
|
||||
} finally {
|
||||
|
|
|
@ -19,90 +19,26 @@
|
|||
|
||||
package org.elasticsearch.index.translog.fs;
|
||||
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.TranslogException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class FsTranslogFile {
|
||||
public interface FsTranslogFile {
|
||||
|
||||
private final long id;
|
||||
private final ShardId shardId;
|
||||
private final RafReference raf;
|
||||
long id();
|
||||
|
||||
private final AtomicInteger operationCounter = new AtomicInteger();
|
||||
int estimatedNumberOfOperations();
|
||||
|
||||
private final AtomicLong lastPosition = new AtomicLong(0);
|
||||
private final AtomicLong lastWrittenPosition = new AtomicLong(0);
|
||||
long translogSizeInBytes();
|
||||
|
||||
private volatile long lastSyncPosition = 0;
|
||||
Translog.Location add(byte[] data, int from, int size) throws IOException;
|
||||
|
||||
public FsTranslogFile(ShardId shardId, long id, RafReference raf) throws IOException {
|
||||
this.shardId = shardId;
|
||||
this.id = id;
|
||||
this.raf = raf;
|
||||
raf.raf().setLength(0);
|
||||
}
|
||||
byte[] read(Translog.Location location) throws IOException;
|
||||
|
||||
public long id() {
|
||||
return this.id;
|
||||
}
|
||||
void close(boolean delete);
|
||||
|
||||
public int estimatedNumberOfOperations() {
|
||||
return operationCounter.get();
|
||||
}
|
||||
FsChannelSnapshot snapshot() throws TranslogException;
|
||||
|
||||
public long translogSizeInBytes() {
|
||||
return lastWrittenPosition.get();
|
||||
}
|
||||
|
||||
public Translog.Location add(byte[] data, int from, int size) throws IOException {
|
||||
long position = lastPosition.getAndAdd(size);
|
||||
raf.channel().write(ByteBuffer.wrap(data, from, size), position);
|
||||
lastWrittenPosition.getAndAdd(size);
|
||||
operationCounter.incrementAndGet();
|
||||
return new Translog.Location(id, position, size);
|
||||
}
|
||||
|
||||
public byte[] read(Translog.Location location) throws IOException {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(location.size);
|
||||
raf.channel().read(buffer, location.translogLocation);
|
||||
return buffer.array();
|
||||
}
|
||||
|
||||
public void close(boolean delete) {
|
||||
raf.decreaseRefCount(delete);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a snapshot on this file, <tt>null</tt> if it failed to snapshot.
|
||||
*/
|
||||
public FsChannelSnapshot snapshot() throws TranslogException {
|
||||
try {
|
||||
if (!raf.increaseRefCount()) {
|
||||
return null;
|
||||
}
|
||||
return new FsChannelSnapshot(this.id, raf, lastWrittenPosition.get(), operationCounter.get());
|
||||
} catch (Exception e) {
|
||||
throw new TranslogException(shardId, "Failed to snapshot", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void sync() {
|
||||
try {
|
||||
// check if we really need to sync here...
|
||||
long last = lastWrittenPosition.get();
|
||||
if (last == lastSyncPosition) {
|
||||
return;
|
||||
}
|
||||
lastSyncPosition = last;
|
||||
raf.channel().force(false);
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
void sync();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.translog.fs;
|
||||
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.TranslogException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class SimpleFsTranslogFile implements FsTranslogFile {
|
||||
|
||||
private final long id;
|
||||
private final ShardId shardId;
|
||||
private final RafReference raf;
|
||||
|
||||
private final AtomicInteger operationCounter = new AtomicInteger();
|
||||
|
||||
private final AtomicLong lastPosition = new AtomicLong(0);
|
||||
private final AtomicLong lastWrittenPosition = new AtomicLong(0);
|
||||
|
||||
private volatile long lastSyncPosition = 0;
|
||||
|
||||
public SimpleFsTranslogFile(ShardId shardId, long id, RafReference raf) throws IOException {
|
||||
this.shardId = shardId;
|
||||
this.id = id;
|
||||
this.raf = raf;
|
||||
raf.raf().setLength(0);
|
||||
}
|
||||
|
||||
public long id() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
public int estimatedNumberOfOperations() {
|
||||
return operationCounter.get();
|
||||
}
|
||||
|
||||
public long translogSizeInBytes() {
|
||||
return lastWrittenPosition.get();
|
||||
}
|
||||
|
||||
public Translog.Location add(byte[] data, int from, int size) throws IOException {
|
||||
long position = lastPosition.getAndAdd(size);
|
||||
raf.channel().write(ByteBuffer.wrap(data, from, size), position);
|
||||
lastWrittenPosition.getAndAdd(size);
|
||||
operationCounter.incrementAndGet();
|
||||
return new Translog.Location(id, position, size);
|
||||
}
|
||||
|
||||
public byte[] read(Translog.Location location) throws IOException {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(location.size);
|
||||
raf.channel().read(buffer, location.translogLocation);
|
||||
return buffer.array();
|
||||
}
|
||||
|
||||
public void close(boolean delete) {
|
||||
raf.decreaseRefCount(delete);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a snapshot on this file, <tt>null</tt> if it failed to snapshot.
|
||||
*/
|
||||
public FsChannelSnapshot snapshot() throws TranslogException {
|
||||
try {
|
||||
if (!raf.increaseRefCount()) {
|
||||
return null;
|
||||
}
|
||||
return new FsChannelSnapshot(this.id, raf, lastWrittenPosition.get(), operationCounter.get());
|
||||
} catch (Exception e) {
|
||||
throw new TranslogException(shardId, "Failed to snapshot", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void sync() {
|
||||
try {
|
||||
// check if we really need to sync here...
|
||||
long last = lastWrittenPosition.get();
|
||||
if (last == lastSyncPosition) {
|
||||
return;
|
||||
}
|
||||
lastSyncPosition = last;
|
||||
raf.channel().force(false);
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
|
@ -32,7 +32,6 @@ import org.elasticsearch.node.Node;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
|
@ -50,11 +49,10 @@ public class SingleThreadBulkStress {
|
|||
Random random = new Random();
|
||||
|
||||
Settings settings = settingsBuilder()
|
||||
.put("cluster.routing.schedule", 200, TimeUnit.MILLISECONDS)
|
||||
.put("index.refresh_interval", "-1")
|
||||
.put("index.refresh_interval", "1s")
|
||||
.put("index.merge.async", true)
|
||||
.put("index.translog.flush_threshold_ops", 5000)
|
||||
.put("gateway.type", "local")
|
||||
.put("gateway.type", "none")
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.build();
|
||||
|
|
Loading…
Reference in New Issue