Translog: When not sync'ing on each operation, buffer writes, closes #1549.
This commit is contained in:
parent
010b910333
commit
a3ca1afed5
|
@ -274,7 +274,7 @@ public class Store extends AbstractIndexShardComponent {
|
||||||
/**
|
/**
|
||||||
* The idea of the store directory is to cache file level meta data, as well as md5 of it
|
* The idea of the store directory is to cache file level meta data, as well as md5 of it
|
||||||
*/
|
*/
|
||||||
protected class StoreDirectory extends Directory implements ForceSyncDirectory {
|
class StoreDirectory extends Directory implements ForceSyncDirectory {
|
||||||
|
|
||||||
private final Directory[] delegates;
|
private final Directory[] delegates;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,169 @@
|
||||||
|
package org.elasticsearch.index.translog.fs;
|
||||||
|
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
import org.elasticsearch.index.translog.TranslogException;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class BufferingFsTranslogFile implements FsTranslogFile {
|
||||||
|
|
||||||
|
private final long id;
|
||||||
|
private final ShardId shardId;
|
||||||
|
private final RafReference raf;
|
||||||
|
|
||||||
|
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
|
private volatile int operationCounter;
|
||||||
|
|
||||||
|
private long lastPosition;
|
||||||
|
private volatile long lastWrittenPosition;
|
||||||
|
|
||||||
|
private volatile long lastSyncPosition = 0;
|
||||||
|
|
||||||
|
private byte[] buffer;
|
||||||
|
private int bufferCount;
|
||||||
|
|
||||||
|
public BufferingFsTranslogFile(ShardId shardId, long id, RafReference raf, int bufferSize) throws IOException {
|
||||||
|
this.shardId = shardId;
|
||||||
|
this.id = id;
|
||||||
|
this.raf = raf;
|
||||||
|
this.buffer = new byte[bufferSize];
|
||||||
|
raf.raf().setLength(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long id() {
|
||||||
|
return this.id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int estimatedNumberOfOperations() {
|
||||||
|
return operationCounter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long translogSizeInBytes() {
|
||||||
|
return lastWrittenPosition;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Translog.Location add(byte[] data, int from, int size) throws IOException {
|
||||||
|
rwl.writeLock().lock();
|
||||||
|
try {
|
||||||
|
operationCounter++;
|
||||||
|
long position = lastPosition;
|
||||||
|
if (size >= buffer.length) {
|
||||||
|
flushBuffer();
|
||||||
|
raf.raf().write(data, from, size);
|
||||||
|
lastWrittenPosition += size;
|
||||||
|
lastPosition += size;
|
||||||
|
return new Translog.Location(id, position, size);
|
||||||
|
}
|
||||||
|
if (size > buffer.length - bufferCount) {
|
||||||
|
flushBuffer();
|
||||||
|
}
|
||||||
|
System.arraycopy(data, from, buffer, bufferCount, size);
|
||||||
|
bufferCount += size;
|
||||||
|
lastPosition += size;
|
||||||
|
return new Translog.Location(id, position, size);
|
||||||
|
} finally {
|
||||||
|
rwl.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void flushBuffer() throws IOException {
|
||||||
|
if (bufferCount > 0) {
|
||||||
|
raf.raf().write(buffer, 0, bufferCount);
|
||||||
|
lastWrittenPosition += bufferCount;
|
||||||
|
bufferCount = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] read(Translog.Location location) throws IOException {
|
||||||
|
rwl.readLock().lock();
|
||||||
|
try {
|
||||||
|
if (location.translogLocation >= lastWrittenPosition) {
|
||||||
|
byte[] data = new byte[location.size];
|
||||||
|
System.arraycopy(buffer, (int) (location.translogLocation - lastWrittenPosition), data, 0, location.size);
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
rwl.readLock().unlock();
|
||||||
|
}
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(location.size);
|
||||||
|
raf.channel().read(buffer, location.translogLocation);
|
||||||
|
return buffer.array();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FsChannelSnapshot snapshot() throws TranslogException {
|
||||||
|
rwl.writeLock().lock();
|
||||||
|
try {
|
||||||
|
flushBuffer();
|
||||||
|
if (!raf.increaseRefCount()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return new FsChannelSnapshot(this.id, raf, lastWrittenPosition, operationCounter);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new TranslogException(shardId, "failed to flush", e);
|
||||||
|
} finally {
|
||||||
|
rwl.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sync() {
|
||||||
|
try {
|
||||||
|
// check if we really need to sync here...
|
||||||
|
long last = lastWrittenPosition;
|
||||||
|
if (last == lastSyncPosition) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
lastSyncPosition = last;
|
||||||
|
rwl.writeLock().lock();
|
||||||
|
try {
|
||||||
|
flushBuffer();
|
||||||
|
} finally {
|
||||||
|
rwl.writeLock().unlock();
|
||||||
|
}
|
||||||
|
raf.channel().force(false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close(boolean delete) {
|
||||||
|
if (!delete) {
|
||||||
|
rwl.writeLock().lock();
|
||||||
|
try {
|
||||||
|
flushBuffer();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new TranslogException(shardId, "failed to close", e);
|
||||||
|
} finally {
|
||||||
|
rwl.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
raf.decreaseRefCount(delete);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reuse(FsTranslogFile other) {
|
||||||
|
if (!(other instanceof BufferingFsTranslogFile)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
rwl.writeLock().lock();
|
||||||
|
try {
|
||||||
|
flushBuffer();
|
||||||
|
this.buffer = ((BufferingFsTranslogFile) other).buffer;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new TranslogException(shardId, "failed to flush", e);
|
||||||
|
} finally {
|
||||||
|
rwl.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,13 +20,16 @@
|
||||||
package org.elasticsearch.index.translog.fs;
|
package org.elasticsearch.index.translog.fs;
|
||||||
|
|
||||||
import jsr166y.ThreadLocalRandom;
|
import jsr166y.ThreadLocalRandom;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.FileSystemUtils;
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.CachedStreamOutput;
|
import org.elasticsearch.common.io.stream.CachedStreamOutput;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
|
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
@ -44,29 +47,99 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
*/
|
*/
|
||||||
public class FsTranslog extends AbstractIndexShardComponent implements Translog {
|
public class FsTranslog extends AbstractIndexShardComponent implements Translog {
|
||||||
|
|
||||||
|
static {
|
||||||
|
IndexMetaData.addDynamicSettings(
|
||||||
|
"index.translog.fs.type",
|
||||||
|
"index.translog.fs.buffer_size",
|
||||||
|
"index.translog.fs.transient_buffer_size"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
class ApplySettings implements IndexSettingsService.Listener {
|
||||||
|
@Override
|
||||||
|
public void onRefreshSettings(Settings settings) {
|
||||||
|
int bufferSize = (int) settings.getAsBytesSize("index.translog.fs.buffer_size", new ByteSizeValue(FsTranslog.this.bufferSize)).bytes();
|
||||||
|
if (bufferSize != FsTranslog.this.bufferSize) {
|
||||||
|
logger.info("updating buffer_size from [{}] to [{}]", new ByteSizeValue(FsTranslog.this.bufferSize), new ByteSizeValue(bufferSize));
|
||||||
|
FsTranslog.this.bufferSize = bufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
int transientBufferSize = (int) settings.getAsBytesSize("index.translog.fs.transient_buffer_size", new ByteSizeValue(FsTranslog.this.transientBufferSize)).bytes();
|
||||||
|
if (transientBufferSize != FsTranslog.this.transientBufferSize) {
|
||||||
|
logger.info("updating transient_buffer_size from [{}] to [{}]", new ByteSizeValue(FsTranslog.this.transientBufferSize), new ByteSizeValue(transientBufferSize));
|
||||||
|
FsTranslog.this.transientBufferSize = transientBufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
FsTranslogFile.Type type = FsTranslogFile.Type.fromString(settings.get("index.translog.fs.type", FsTranslog.this.type.name()));
|
||||||
|
if (type != FsTranslog.this.type) {
|
||||||
|
logger.info("updating type from [{}] to [{}]", FsTranslog.this.type, type);
|
||||||
|
FsTranslog.this.type = type;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final IndexSettingsService indexSettingsService;
|
||||||
|
|
||||||
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
|
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||||
private final File[] locations;
|
private final File[] locations;
|
||||||
|
|
||||||
private volatile FsTranslogFile current;
|
private volatile FsTranslogFile current;
|
||||||
private volatile FsTranslogFile trans;
|
private volatile FsTranslogFile trans;
|
||||||
|
|
||||||
|
private FsTranslogFile.Type type;
|
||||||
|
|
||||||
private boolean syncOnEachOperation = false;
|
private boolean syncOnEachOperation = false;
|
||||||
|
|
||||||
|
private int bufferSize;
|
||||||
|
private int transientBufferSize;
|
||||||
|
|
||||||
|
private final ApplySettings applySettings = new ApplySettings();
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) {
|
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, NodeEnvironment nodeEnv) {
|
||||||
super(shardId, indexSettings);
|
super(shardId, indexSettings);
|
||||||
|
this.indexSettingsService = indexSettingsService;
|
||||||
File[] shardLocations = nodeEnv.shardLocations(shardId);
|
File[] shardLocations = nodeEnv.shardLocations(shardId);
|
||||||
this.locations = new File[shardLocations.length];
|
this.locations = new File[shardLocations.length];
|
||||||
for (int i = 0; i < shardLocations.length; i++) {
|
for (int i = 0; i < shardLocations.length; i++) {
|
||||||
locations[i] = new File(shardLocations[i], "translog");
|
locations[i] = new File(shardLocations[i], "translog");
|
||||||
FileSystemUtils.mkdirs(locations[i]);
|
FileSystemUtils.mkdirs(locations[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name()));
|
||||||
|
this.bufferSize = (int) componentSettings.getAsBytesSize("buffer_size", ByteSizeValue.parseBytesSizeValue("64k")).bytes();
|
||||||
|
this.transientBufferSize = (int) componentSettings.getAsBytesSize("transient_buffer_size", ByteSizeValue.parseBytesSizeValue("8k")).bytes();
|
||||||
|
|
||||||
|
indexSettingsService.addListener(applySettings);
|
||||||
}
|
}
|
||||||
|
|
||||||
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File location) {
|
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File location) {
|
||||||
super(shardId, indexSettings);
|
super(shardId, indexSettings);
|
||||||
|
this.indexSettingsService = null;
|
||||||
this.locations = new File[]{location};
|
this.locations = new File[]{location};
|
||||||
FileSystemUtils.mkdirs(location);
|
FileSystemUtils.mkdirs(location);
|
||||||
|
|
||||||
|
this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close(boolean delete) {
|
||||||
|
if (indexSettingsService != null) {
|
||||||
|
indexSettingsService.removeListener(applySettings);
|
||||||
|
}
|
||||||
|
rwl.writeLock().lock();
|
||||||
|
try {
|
||||||
|
FsTranslogFile current1 = this.current;
|
||||||
|
if (current1 != null) {
|
||||||
|
current1.close(delete);
|
||||||
|
}
|
||||||
|
current1 = this.trans;
|
||||||
|
if (current1 != null) {
|
||||||
|
current1.close(delete);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
rwl.writeLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public File[] locations() {
|
public File[] locations() {
|
||||||
|
@ -149,7 +222,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
newFile = new SimpleFsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id)));
|
newFile = type.create(shardId, id, new RafReference(new File(location, "translog-" + id)), bufferSize);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new TranslogException(shardId, "failed to create new translog file", e);
|
throw new TranslogException(shardId, "failed to create new translog file", e);
|
||||||
}
|
}
|
||||||
|
@ -184,7 +257,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
||||||
location = file;
|
location = file;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.trans = new SimpleFsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id)));
|
this.trans = type.create(shardId, id, new RafReference(new File(location, "translog-" + id)), transientBufferSize);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new TranslogException(shardId, "failed to create new translog file", e);
|
throw new TranslogException(shardId, "failed to create new translog file", e);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -205,6 +278,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
||||||
rwl.writeLock().unlock();
|
rwl.writeLock().unlock();
|
||||||
}
|
}
|
||||||
old.close(true);
|
old.close(true);
|
||||||
|
current.reuse(old);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -311,22 +385,10 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
||||||
@Override
|
@Override
|
||||||
public void syncOnEachOperation(boolean syncOnEachOperation) {
|
public void syncOnEachOperation(boolean syncOnEachOperation) {
|
||||||
this.syncOnEachOperation = syncOnEachOperation;
|
this.syncOnEachOperation = syncOnEachOperation;
|
||||||
}
|
if (syncOnEachOperation) {
|
||||||
|
type = FsTranslogFile.Type.SIMPLE;
|
||||||
@Override
|
} else {
|
||||||
public void close(boolean delete) {
|
type = FsTranslogFile.Type.BUFFERED;
|
||||||
rwl.writeLock().lock();
|
|
||||||
try {
|
|
||||||
FsTranslogFile current1 = this.current;
|
|
||||||
if (current1 != null) {
|
|
||||||
current1.close(delete);
|
|
||||||
}
|
|
||||||
current1 = this.trans;
|
|
||||||
if (current1 != null) {
|
|
||||||
current1.close(delete);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
rwl.writeLock().unlock();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.translog.fs;
|
package org.elasticsearch.index.translog.fs;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.index.translog.TranslogException;
|
import org.elasticsearch.index.translog.TranslogException;
|
||||||
|
|
||||||
|
@ -26,6 +28,33 @@ import java.io.IOException;
|
||||||
|
|
||||||
public interface FsTranslogFile {
|
public interface FsTranslogFile {
|
||||||
|
|
||||||
|
public static enum Type {
|
||||||
|
|
||||||
|
SIMPLE() {
|
||||||
|
@Override
|
||||||
|
public FsTranslogFile create(ShardId shardId, long id, RafReference raf, int bufferSize) throws IOException {
|
||||||
|
return new SimpleFsTranslogFile(shardId, id, raf);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
BUFFERED() {
|
||||||
|
@Override
|
||||||
|
public FsTranslogFile create(ShardId shardId, long id, RafReference raf, int bufferSize) throws IOException {
|
||||||
|
return new BufferingFsTranslogFile(shardId, id, raf, bufferSize);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public abstract FsTranslogFile create(ShardId shardId, long id, RafReference raf, int bufferSize) throws IOException;
|
||||||
|
|
||||||
|
public static Type fromString(String type) throws ElasticSearchIllegalArgumentException {
|
||||||
|
if (SIMPLE.name().equalsIgnoreCase(type)) {
|
||||||
|
return SIMPLE;
|
||||||
|
} else if (BUFFERED.name().equalsIgnoreCase(type)) {
|
||||||
|
return BUFFERED;
|
||||||
|
}
|
||||||
|
throw new ElasticSearchIllegalArgumentException("No translog fs type [" + type + "]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
long id();
|
long id();
|
||||||
|
|
||||||
int estimatedNumberOfOperations();
|
int estimatedNumberOfOperations();
|
||||||
|
@ -36,9 +65,11 @@ public interface FsTranslogFile {
|
||||||
|
|
||||||
byte[] read(Translog.Location location) throws IOException;
|
byte[] read(Translog.Location location) throws IOException;
|
||||||
|
|
||||||
void close(boolean delete);
|
void close(boolean delete) throws TranslogException;
|
||||||
|
|
||||||
FsChannelSnapshot snapshot() throws TranslogException;
|
FsChannelSnapshot snapshot() throws TranslogException;
|
||||||
|
|
||||||
|
void reuse(FsTranslogFile other) throws TranslogException;
|
||||||
|
|
||||||
void sync();
|
void sync();
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,4 +105,9 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reuse(FsTranslogFile other) {
|
||||||
|
// nothing to do there
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.indices.analysis.IndicesAnalysisModule;
|
import org.elasticsearch.indices.analysis.IndicesAnalysisModule;
|
||||||
import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache;
|
import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache;
|
||||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||||
import org.elasticsearch.indices.memory.IndexingMemoryBufferController;
|
import org.elasticsearch.indices.memory.IndexingMemoryController;
|
||||||
import org.elasticsearch.indices.query.IndicesQueriesModule;
|
import org.elasticsearch.indices.query.IndicesQueriesModule;
|
||||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||||
import org.elasticsearch.indices.recovery.RecoverySource;
|
import org.elasticsearch.indices.recovery.RecoverySource;
|
||||||
|
@ -62,7 +62,7 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
|
||||||
bind(RecoverySource.class).asEagerSingleton();
|
bind(RecoverySource.class).asEagerSingleton();
|
||||||
|
|
||||||
bind(IndicesClusterStateService.class).asEagerSingleton();
|
bind(IndicesClusterStateService.class).asEagerSingleton();
|
||||||
bind(IndexingMemoryBufferController.class).asEagerSingleton();
|
bind(IndexingMemoryController.class).asEagerSingleton();
|
||||||
bind(IndicesNodeFilterCache.class).asEagerSingleton();
|
bind(IndicesNodeFilterCache.class).asEagerSingleton();
|
||||||
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
|
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
|
||||||
bind(IndicesTTLService.class).asEagerSingleton();
|
bind(IndicesTTLService.class).asEagerSingleton();
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.indices.memory;
|
package org.elasticsearch.indices.memory;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import org.elasticsearch.ElasticSearchException;
|
import org.elasticsearch.ElasticSearchException;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
|
@ -40,13 +41,14 @@ import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class IndexingMemoryBufferController extends AbstractLifecycleComponent<IndexingMemoryBufferController> {
|
public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> {
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
|
@ -70,7 +72,7 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent<I
|
||||||
private final Object mutex = new Object();
|
private final Object mutex = new Object();
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public IndexingMemoryBufferController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
|
public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.indicesService = indicesService;
|
this.indicesService = indicesService;
|
||||||
|
@ -131,6 +133,8 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent<I
|
||||||
public void run() {
|
public void run() {
|
||||||
synchronized (mutex) {
|
synchronized (mutex) {
|
||||||
boolean activeInactiveStatusChanges = false;
|
boolean activeInactiveStatusChanges = false;
|
||||||
|
List<IndexShard> activeToInactiveIndexingShards = Lists.newArrayList();
|
||||||
|
List<IndexShard> inactiveToActiveIndexingShards = Lists.newArrayList();
|
||||||
for (IndexService indexService : indicesService) {
|
for (IndexService indexService : indicesService) {
|
||||||
for (IndexShard indexShard : indexService) {
|
for (IndexShard indexShard : indexService) {
|
||||||
long time = threadPool.estimatedTimeInMillis();
|
long time = threadPool.estimatedTimeInMillis();
|
||||||
|
@ -145,27 +149,20 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent<I
|
||||||
status.time = time;
|
status.time = time;
|
||||||
}
|
}
|
||||||
// inactive?
|
// inactive?
|
||||||
if (!status.inactive) {
|
if (!status.inactiveIndexing) {
|
||||||
// mark it as inactive only if enough time has passed and there are no ongoing merges going on...
|
// mark it as inactive only if enough time has passed and there are no ongoing merges going on...
|
||||||
if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().current() == 0) {
|
if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().current() == 0) {
|
||||||
try {
|
|
||||||
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER);
|
|
||||||
} catch (EngineClosedException e) {
|
|
||||||
// ignore
|
|
||||||
continue;
|
|
||||||
} catch (FlushNotAllowedEngineException e) {
|
|
||||||
// ignore
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// inactive for this amount of time, mark it
|
// inactive for this amount of time, mark it
|
||||||
status.inactive = true;
|
activeToInactiveIndexingShards.add(indexShard);
|
||||||
|
status.inactiveIndexing = true;
|
||||||
activeInactiveStatusChanges = true;
|
activeInactiveStatusChanges = true;
|
||||||
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER);
|
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (status.inactive) {
|
if (status.inactiveIndexing) {
|
||||||
status.inactive = false;
|
inactiveToActiveIndexingShards.add(indexShard);
|
||||||
|
status.inactiveIndexing = false;
|
||||||
activeInactiveStatusChanges = true;
|
activeInactiveStatusChanges = true;
|
||||||
logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id());
|
logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id());
|
||||||
}
|
}
|
||||||
|
@ -175,6 +172,16 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent<I
|
||||||
status.translogNumberOfOperations = translog.estimatedNumberOfOperations();
|
status.translogNumberOfOperations = translog.estimatedNumberOfOperations();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (IndexShard indexShard : activeToInactiveIndexingShards) {
|
||||||
|
// update inactive indexing buffer size
|
||||||
|
try {
|
||||||
|
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER);
|
||||||
|
} catch (EngineClosedException e) {
|
||||||
|
// ignore
|
||||||
|
} catch (FlushNotAllowedEngineException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
if (activeInactiveStatusChanges) {
|
if (activeInactiveStatusChanges) {
|
||||||
calcAndSetShardIndexingBuffer("shards became active/inactive (indexing wise)");
|
calcAndSetShardIndexingBuffer("shards became active/inactive (indexing wise)");
|
||||||
}
|
}
|
||||||
|
@ -221,7 +228,7 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent<I
|
||||||
for (IndexService indexService : indicesService) {
|
for (IndexService indexService : indicesService) {
|
||||||
for (IndexShard indexShard : indexService) {
|
for (IndexShard indexShard : indexService) {
|
||||||
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
|
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
|
||||||
if (status == null || !status.inactive) {
|
if (status == null || !status.inactiveIndexing) {
|
||||||
try {
|
try {
|
||||||
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize);
|
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize);
|
||||||
} catch (EngineClosedException e) {
|
} catch (EngineClosedException e) {
|
||||||
|
@ -247,7 +254,7 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent<I
|
||||||
for (IndexService indexService : indicesService) {
|
for (IndexService indexService : indicesService) {
|
||||||
for (IndexShard indexShard : indexService) {
|
for (IndexShard indexShard : indexService) {
|
||||||
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
|
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
|
||||||
if (status == null || !status.inactive) {
|
if (status == null || !status.inactiveIndexing) {
|
||||||
shardsCount++;
|
shardsCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -258,7 +265,7 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent<I
|
||||||
static class ShardIndexingStatus {
|
static class ShardIndexingStatus {
|
||||||
long translogId = -1;
|
long translogId = -1;
|
||||||
int translogNumberOfOperations = -1;
|
int translogNumberOfOperations = -1;
|
||||||
boolean inactive = false;
|
boolean inactiveIndexing = false;
|
||||||
long time = -1; // contains the first time we saw this shard with no operations done on it
|
long time = -1; // contains the first time we saw this shard with no operations done on it
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -61,7 +61,7 @@ import org.elasticsearch.indices.IndicesModule;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache;
|
import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache;
|
||||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||||
import org.elasticsearch.indices.memory.IndexingMemoryBufferController;
|
import org.elasticsearch.indices.memory.IndexingMemoryController;
|
||||||
import org.elasticsearch.indices.ttl.IndicesTTLService;
|
import org.elasticsearch.indices.ttl.IndicesTTLService;
|
||||||
import org.elasticsearch.jmx.JmxModule;
|
import org.elasticsearch.jmx.JmxModule;
|
||||||
import org.elasticsearch.jmx.JmxService;
|
import org.elasticsearch.jmx.JmxService;
|
||||||
|
@ -176,7 +176,7 @@ public final class InternalNode implements Node {
|
||||||
}
|
}
|
||||||
|
|
||||||
injector.getInstance(IndicesService.class).start();
|
injector.getInstance(IndicesService.class).start();
|
||||||
injector.getInstance(IndexingMemoryBufferController.class).start();
|
injector.getInstance(IndexingMemoryController.class).start();
|
||||||
injector.getInstance(IndicesClusterStateService.class).start();
|
injector.getInstance(IndicesClusterStateService.class).start();
|
||||||
injector.getInstance(IndicesTTLService.class).start();
|
injector.getInstance(IndicesTTLService.class).start();
|
||||||
injector.getInstance(RiversManager.class).start();
|
injector.getInstance(RiversManager.class).start();
|
||||||
|
@ -218,7 +218,7 @@ public final class InternalNode implements Node {
|
||||||
// stop any changes happening as a result of cluster state changes
|
// stop any changes happening as a result of cluster state changes
|
||||||
injector.getInstance(IndicesClusterStateService.class).stop();
|
injector.getInstance(IndicesClusterStateService.class).stop();
|
||||||
// we close indices first, so operations won't be allowed on it
|
// we close indices first, so operations won't be allowed on it
|
||||||
injector.getInstance(IndexingMemoryBufferController.class).stop();
|
injector.getInstance(IndexingMemoryController.class).stop();
|
||||||
injector.getInstance(IndicesTTLService.class).stop();
|
injector.getInstance(IndicesTTLService.class).stop();
|
||||||
injector.getInstance(IndicesService.class).stop();
|
injector.getInstance(IndicesService.class).stop();
|
||||||
// sleep a bit to let operations finish with indices service
|
// sleep a bit to let operations finish with indices service
|
||||||
|
@ -272,7 +272,7 @@ public final class InternalNode implements Node {
|
||||||
injector.getInstance(IndicesClusterStateService.class).close();
|
injector.getInstance(IndicesClusterStateService.class).close();
|
||||||
stopWatch.stop().start("indices");
|
stopWatch.stop().start("indices");
|
||||||
injector.getInstance(IndicesNodeFilterCache.class).close();
|
injector.getInstance(IndicesNodeFilterCache.class).close();
|
||||||
injector.getInstance(IndexingMemoryBufferController.class).close();
|
injector.getInstance(IndexingMemoryController.class).close();
|
||||||
injector.getInstance(IndicesTTLService.class).close();
|
injector.getInstance(IndicesTTLService.class).close();
|
||||||
injector.getInstance(IndicesService.class).close();
|
injector.getInstance(IndicesService.class).close();
|
||||||
stopWatch.stop().start("routing");
|
stopWatch.stop().start("routing");
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static org.elasticsearch.client.Requests.createIndexRequest;
|
import static org.elasticsearch.client.Requests.createIndexRequest;
|
||||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||||
|
@ -43,8 +42,9 @@ public class SingleThreadIndexingStress {
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
Settings settings = settingsBuilder()
|
Settings settings = settingsBuilder()
|
||||||
.put("cluster.routing.schedule", 200, TimeUnit.MILLISECONDS)
|
.put("index.refresh_interval", "1s")
|
||||||
.put("index.engine.robin.refreshInterval", "-1")
|
.put("index.merge.async", true)
|
||||||
|
.put("index.translog.flush_threshold_ops", 5000)
|
||||||
.put("gateway.type", "none")
|
.put("gateway.type", "none")
|
||||||
.put(SETTING_NUMBER_OF_SHARDS, 2)
|
.put(SETTING_NUMBER_OF_SHARDS, 2)
|
||||||
.put(SETTING_NUMBER_OF_REPLICAS, 1)
|
.put(SETTING_NUMBER_OF_REPLICAS, 1)
|
||||||
|
|
|
@ -20,15 +20,19 @@
|
||||||
package org.elasticsearch.test.unit.index.translog;
|
package org.elasticsearch.test.unit.index.translog;
|
||||||
|
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
|
import org.elasticsearch.common.BytesHolder;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
import org.elasticsearch.index.translog.TranslogStreams;
|
||||||
import org.hamcrest.MatcherAssert;
|
import org.hamcrest.MatcherAssert;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
import org.testng.annotations.AfterMethod;
|
import org.testng.annotations.AfterMethod;
|
||||||
import org.testng.annotations.BeforeMethod;
|
import org.testng.annotations.BeforeMethod;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
|
@ -54,6 +58,21 @@ public abstract class AbstractSimpleTranslogTests {
|
||||||
|
|
||||||
protected abstract Translog create();
|
protected abstract Translog create();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRead() throws IOException {
|
||||||
|
Translog.Location loc1 = translog.add(new Translog.Create("test", "1", new byte[]{1}));
|
||||||
|
Translog.Location loc2 = translog.add(new Translog.Create("test", "2", new byte[]{2}));
|
||||||
|
assertThat(TranslogStreams.readSource(translog.read(loc1)).source, equalTo(new BytesHolder(new byte[]{1})));
|
||||||
|
assertThat(TranslogStreams.readSource(translog.read(loc2)).source, equalTo(new BytesHolder(new byte[]{2})));
|
||||||
|
translog.sync();
|
||||||
|
assertThat(TranslogStreams.readSource(translog.read(loc1)).source, equalTo(new BytesHolder(new byte[]{1})));
|
||||||
|
assertThat(TranslogStreams.readSource(translog.read(loc2)).source, equalTo(new BytesHolder(new byte[]{2})));
|
||||||
|
Translog.Location loc3 = translog.add(new Translog.Create("test", "2", new byte[]{3}));
|
||||||
|
assertThat(TranslogStreams.readSource(translog.read(loc3)).source, equalTo(new BytesHolder(new byte[]{3})));
|
||||||
|
translog.sync();
|
||||||
|
assertThat(TranslogStreams.readSource(translog.read(loc3)).source, equalTo(new BytesHolder(new byte[]{3})));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTransientTranslog() {
|
public void testTransientTranslog() {
|
||||||
Translog.Snapshot snapshot = translog.snapshot();
|
Translog.Snapshot snapshot = translog.snapshot();
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Licensed to ElasticSearch and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. ElasticSearch licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.test.unit.index.translog.fs;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
import org.elasticsearch.index.translog.fs.FsTranslog;
|
||||||
|
import org.elasticsearch.index.translog.fs.FsTranslogFile;
|
||||||
|
import org.elasticsearch.test.unit.index.translog.AbstractSimpleTranslogTests;
|
||||||
|
import org.testng.annotations.AfterClass;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class FsBufferedTranslogTests extends AbstractSimpleTranslogTests {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Translog create() {
|
||||||
|
return new FsTranslog(shardId,
|
||||||
|
ImmutableSettings.settingsBuilder().put("index.translog.fs.type", FsTranslogFile.Type.BUFFERED.name()).build(),
|
||||||
|
new File("data/fs-translog"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public void cleanup() {
|
||||||
|
FileSystemUtils.deleteRecursively(new File("data/fs-translog"), true);
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,15 +20,15 @@
|
||||||
package org.elasticsearch.test.unit.index.translog.fs;
|
package org.elasticsearch.test.unit.index.translog.fs;
|
||||||
|
|
||||||
import org.elasticsearch.common.io.FileSystemUtils;
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.index.translog.fs.FsTranslog;
|
import org.elasticsearch.index.translog.fs.FsTranslog;
|
||||||
|
import org.elasticsearch.index.translog.fs.FsTranslogFile;
|
||||||
import org.elasticsearch.test.unit.index.translog.AbstractSimpleTranslogTests;
|
import org.elasticsearch.test.unit.index.translog.AbstractSimpleTranslogTests;
|
||||||
import org.testng.annotations.AfterClass;
|
import org.testng.annotations.AfterClass;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@ -36,7 +36,9 @@ public class FsSimpleTranslogTests extends AbstractSimpleTranslogTests {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Translog create() {
|
protected Translog create() {
|
||||||
return new FsTranslog(shardId, EMPTY_SETTINGS, new File("data/fs-translog"));
|
return new FsTranslog(shardId,
|
||||||
|
ImmutableSettings.settingsBuilder().put("index.translog.fs.type", FsTranslogFile.Type.SIMPLE.name()).build(),
|
||||||
|
new File("data/fs-translog"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
|
Loading…
Reference in New Issue