Remove Translog interface

We only have one implementation of this interface which makes not much
sense. This commit removes the abstraction.
This commit is contained in:
Simon Willnauer 2015-05-05 17:18:50 +02:00
parent fafd67e1ae
commit 200174aa37
23 changed files with 715 additions and 832 deletions

View File

@ -29,7 +29,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArray;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.CodecService;
@ -41,11 +40,9 @@ import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;

View File

@ -18,8 +18,6 @@
*/ */
package org.elasticsearch.index.engine; package org.elasticsearch.index.engine;
import org.elasticsearch.index.translog.fs.FsTranslog;
/** /**
* Simple Engine Factory * Simple Engine Factory
*/ */

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.engine; package org.elasticsearch.index.engine;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.lucene.index.*; import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
@ -50,7 +49,6 @@ import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery; import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -77,7 +75,7 @@ public class InternalEngine extends Engine {
private final ShardIndexingService indexingService; private final ShardIndexingService indexingService;
@Nullable @Nullable
private final IndicesWarmer warmer; private final IndicesWarmer warmer;
private final FsTranslog translog; private final Translog translog;
private final MergePolicyProvider mergePolicyProvider; private final MergePolicyProvider mergePolicyProvider;
private final MergeSchedulerProvider mergeScheduler; private final MergeSchedulerProvider mergeScheduler;
@ -109,7 +107,7 @@ public class InternalEngine extends Engine {
this.versionMap = new LiveVersionMap(); this.versionMap = new LiveVersionMap();
store.incRef(); store.incRef();
IndexWriter writer = null; IndexWriter writer = null;
FsTranslog translog = null; Translog translog = null;
SearcherManager manager = null; SearcherManager manager = null;
boolean success = false; boolean success = false;
try { try {
@ -129,7 +127,7 @@ public class InternalEngine extends Engine {
try { try {
writer = createWriter(); writer = createWriter();
indexWriter = writer; indexWriter = writer;
translog = new FsTranslog(engineConfig.getShardId(), engineConfig.getIndesSettingService(), engineConfig.getBigArrays(), engineConfig.getTranslogPath(), engineConfig.getThreadPool()); translog = new Translog(engineConfig.getShardId(), engineConfig.getIndesSettingService(), engineConfig.getBigArrays(), engineConfig.getTranslogPath(), engineConfig.getThreadPool());
committedTranslogId = loadCommittedTranslogId(writer, translog); committedTranslogId = loadCommittedTranslogId(writer, translog);
} catch (IOException e) { } catch (IOException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e); throw new EngineCreationFailureException(shardId, "failed to create engine", e);

View File

@ -18,8 +18,6 @@
*/ */
package org.elasticsearch.index.engine; package org.elasticsearch.index.engine;
import org.elasticsearch.index.translog.fs.FsTranslog;
public class InternalEngineFactory implements EngineFactory { public class InternalEngineFactory implements EngineFactory {
@Override @Override
public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) { public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) {

View File

@ -38,7 +38,7 @@ import org.elasticsearch.index.search.slowlog.ShardSlowLogSearchService;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.indices.ttl.IndicesTTLService;
@ -64,7 +64,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_ALLOCATION); indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_ALLOCATION);
indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION); indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION);
indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION); indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION);
indexDynamicSettings.addDynamicSetting(FsTranslog.INDEX_TRANSLOG_FS_TYPE); indexDynamicSettings.addDynamicSetting(Translog.INDEX_TRANSLOG_FS_TYPE);
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, Validator.NON_NEGATIVE_INTEGER); indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, Validator.NON_NEGATIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS); indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS);
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_READ_ONLY); indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_READ_ONLY);

View File

@ -17,15 +17,12 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -33,7 +30,7 @@ import java.nio.ByteBuffer;
/** /**
*/ */
public final class BufferingFsTranslogFile extends FsTranslogFile { public final class BufferingTranslogFile extends TranslogFile {
private volatile int operationCounter; private volatile int operationCounter;
private volatile long lastPosition; private volatile long lastPosition;
@ -45,7 +42,7 @@ public final class BufferingFsTranslogFile extends FsTranslogFile {
private int bufferCount; private int bufferCount;
private WrapperOutputStream bufferOs = new WrapperOutputStream(); private WrapperOutputStream bufferOs = new WrapperOutputStream();
public BufferingFsTranslogFile(ShardId shardId, long id, ChannelReference channelReference, int bufferSize) throws IOException { public BufferingTranslogFile(ShardId shardId, long id, ChannelReference channelReference, int bufferSize) throws IOException {
super(shardId, id, channelReference); super(shardId, id, channelReference);
this.buffer = new byte[bufferSize]; this.buffer = new byte[bufferSize];
final TranslogStream stream = this.channelReference.stream(); final TranslogStream stream = this.channelReference.stream();
@ -113,11 +110,11 @@ public final class BufferingFsTranslogFile extends FsTranslogFile {
Channels.readFromFileChannelWithEofException(channelReference.channel(), position, targetBuffer); Channels.readFromFileChannelWithEofException(channelReference.channel(), position, targetBuffer);
} }
public FsChannelImmutableReader immutableReader() throws TranslogException { public ChannelImmutableReader immutableReader() throws TranslogException {
if (channelReference.tryIncRef()) { if (channelReference.tryIncRef()) {
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
flushBuffer(); flushBuffer();
FsChannelImmutableReader reader = new FsChannelImmutableReader(this.id, channelReference, lastWrittenPosition, operationCounter); ChannelImmutableReader reader = new ChannelImmutableReader(this.id, channelReference, lastWrittenPosition, operationCounter);
channelReference.incRef(); // for new reader channelReference.incRef(); // for new reader
return reader; return reader;
} catch (Exception e) { } catch (Exception e) {
@ -157,14 +154,14 @@ public final class BufferingFsTranslogFile extends FsTranslogFile {
} }
@Override @Override
public void reuse(FsTranslogFile other) { public void reuse(TranslogFile other) {
if (!(other instanceof BufferingFsTranslogFile)) { if (!(other instanceof BufferingTranslogFile)) {
return; return;
} }
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
try { try {
flushBuffer(); flushBuffer();
this.buffer = ((BufferingFsTranslogFile) other).buffer; this.buffer = ((BufferingTranslogFile) other).buffer;
} catch (IOException e) { } catch (IOException e) {
throw new TranslogException(shardId, "failed to flush", e); throw new TranslogException(shardId, "failed to flush", e);
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog;
import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.Channels;
@ -28,7 +28,7 @@ import java.nio.ByteBuffer;
/** /**
* a channel reader which is fixed in length * a channel reader which is fixed in length
*/ */
public final class FsChannelImmutableReader extends FsChannelReader { public final class ChannelImmutableReader extends ChannelReader {
private final int totalOperations; private final int totalOperations;
private final long length; private final long length;
@ -37,17 +37,17 @@ public final class FsChannelImmutableReader extends FsChannelReader {
* Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point * Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point
* at the end of the last operation in this snapshot. * at the end of the last operation in this snapshot.
*/ */
public FsChannelImmutableReader(long id, ChannelReference channelReference, long length, int totalOperations) { public ChannelImmutableReader(long id, ChannelReference channelReference, long length, int totalOperations) {
super(id, channelReference); super(id, channelReference);
this.length = length; this.length = length;
this.totalOperations = totalOperations; this.totalOperations = totalOperations;
} }
public FsChannelImmutableReader clone() { public ChannelImmutableReader clone() {
if (channelReference.tryIncRef()) { if (channelReference.tryIncRef()) {
try { try {
FsChannelImmutableReader reader = new FsChannelImmutableReader(id, channelReference, length, totalOperations); ChannelImmutableReader reader = new ChannelImmutableReader(id, channelReference, length, totalOperations);
channelReference.incRef(); // for the new object channelReference.incRef(); // for the new object
return reader; return reader;
} finally { } finally {
@ -80,7 +80,7 @@ public final class FsChannelImmutableReader extends FsChannelReader {
} }
@Override @Override
public FsChannelSnapshot newSnapshot() { public ChannelSnapshot newSnapshot() {
return new FsChannelSnapshot(clone()); return new ChannelSnapshot(clone());
} }
} }

View File

@ -17,11 +17,10 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.index.translog.Translog;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -32,7 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* A base class for all classes that allows reading ops from translog files * A base class for all classes that allows reading ops from translog files
*/ */
public abstract class FsChannelReader implements Closeable, Comparable<FsChannelReader> { public abstract class ChannelReader implements Closeable, Comparable<ChannelReader> {
public static final int UNKNOWN_OP_COUNT = -1; public static final int UNKNOWN_OP_COUNT = -1;
@ -41,7 +40,7 @@ public abstract class FsChannelReader implements Closeable, Comparable<FsChannel
protected final FileChannel channel; protected final FileChannel channel;
protected final AtomicBoolean closed = new AtomicBoolean(false); protected final AtomicBoolean closed = new AtomicBoolean(false);
public FsChannelReader(long id, ChannelReference channelReference) { public ChannelReader(long id, ChannelReference channelReference) {
this.id = id; this.id = id;
this.channelReference = channelReference; this.channelReference = channelReference;
this.channel = channelReference.channel(); this.channel = channelReference.channel();
@ -106,7 +105,7 @@ public abstract class FsChannelReader implements Closeable, Comparable<FsChannel
abstract protected void readBytes(ByteBuffer buffer, long position) throws IOException; abstract protected void readBytes(ByteBuffer buffer, long position) throws IOException;
/** create snapshot for this channel */ /** create snapshot for this channel */
abstract public FsChannelSnapshot newSnapshot(); abstract public ChannelSnapshot newSnapshot();
@Override @Override
public void close() throws IOException { public void close() throws IOException {
@ -125,7 +124,7 @@ public abstract class FsChannelReader implements Closeable, Comparable<FsChannel
} }
@Override @Override
public int compareTo(FsChannelReader o) { public int compareTo(ChannelReader o) {
return Long.compare(translogId(), o.translogId()); return Long.compare(translogId(), o.translogId());
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;

View File

@ -16,11 +16,10 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.index.translog.Translog;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -29,17 +28,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* an implementation of {@link org.elasticsearch.index.translog.Translog.Snapshot}, wrapping * an implementation of {@link org.elasticsearch.index.translog.Translog.Snapshot}, wrapping
* a {@link FsChannelReader}. This class is NOT thread-safe. * a {@link ChannelReader}. This class is NOT thread-safe.
*/ */
public class FsChannelSnapshot implements Closeable { public class ChannelSnapshot implements Closeable {
protected final FsChannelReader reader; protected final ChannelReader reader;
protected final AtomicBoolean closed = new AtomicBoolean(false); protected final AtomicBoolean closed = new AtomicBoolean(false);
// we use an atomic long to allow passing it by reference :( // we use an atomic long to allow passing it by reference :(
protected long position; protected long position;
public FsChannelSnapshot(FsChannelReader reader) { public ChannelSnapshot(ChannelReader reader) {
this.reader = reader; this.reader = reader;
this.position = reader.firstPosition(); this.position = reader.firstPosition();
} }

View File

@ -17,26 +17,24 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
public final class SimpleFsTranslogFile extends FsTranslogFile { public final class SimpleTranslogFile extends TranslogFile {
private volatile int operationCounter = 0; private volatile int operationCounter = 0;
private volatile long lastPosition = 0; private volatile long lastPosition = 0;
private volatile long lastWrittenPosition = 0; private volatile long lastWrittenPosition = 0;
private volatile long lastSyncPosition = 0; private volatile long lastSyncPosition = 0;
public SimpleFsTranslogFile(ShardId shardId, long id, ChannelReference channelReference) throws IOException { public SimpleTranslogFile(ShardId shardId, long id, ChannelReference channelReference) throws IOException {
super(shardId, id, channelReference); super(shardId, id, channelReference);
int headerSize = this.channelReference.stream().writeHeader(channelReference.channel()); int headerSize = this.channelReference.stream().writeHeader(channelReference.channel());
this.lastPosition += headerSize; this.lastPosition += headerSize;
@ -82,10 +80,10 @@ public final class SimpleFsTranslogFile extends FsTranslogFile {
} }
} }
public FsChannelImmutableReader immutableReader() throws TranslogException { public ChannelImmutableReader immutableReader() throws TranslogException {
if (channelReference.tryIncRef()) { if (channelReference.tryIncRef()) {
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
FsChannelImmutableReader reader = new FsChannelImmutableReader(this.id, channelReference, lastWrittenPosition, operationCounter); ChannelImmutableReader reader = new ChannelImmutableReader(this.id, channelReference, lastWrittenPosition, operationCounter);
channelReference.incRef(); // for the new object channelReference.incRef(); // for the new object
return reader; return reader;
} finally { } finally {
@ -115,7 +113,7 @@ public final class SimpleFsTranslogFile extends FsTranslogFile {
} }
@Override @Override
public void reuse(FsTranslogFile other) { public void reuse(TranslogFile other) {
// nothing to do there // nothing to do there
} }

View File

@ -19,103 +19,281 @@
package org.elasticsearch.index.translog; package org.elasticsearch.index.translog;
import com.google.common.collect.Iterables;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.*;
import java.util.Arrays; import java.util.*;
import java.util.Collection; import java.util.concurrent.ScheduledFuture;
import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/** /**
* *
*/ */
public interface Translog extends IndexShardComponent { public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {
static ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb");
public static ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb");
public static final String TRANSLOG_ID_KEY = "translog_id"; public static final String TRANSLOG_ID_KEY = "translog_id";
public static final String INDEX_TRANSLOG_FS_TYPE = "index.translog.fs.type";
public static final String INDEX_TRANSLOG_BUFFER_SIZE = "index.translog.fs.buffer_size";
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
public static final String TRANSLOG_FILE_PREFIX = "translog-";
static final Pattern PARSE_ID_PATTERN = Pattern.compile(TRANSLOG_FILE_PREFIX + "(\\d+)(\\.recovering)?$");
private final TimeValue syncInterval;
private volatile ScheduledFuture<?> syncScheduler;
void updateBuffer(ByteSizeValue bufferSize);
/** // this is a concurrent set and is not protected by any of the locks. The main reason
* Returns the id of the current transaction log. // is that is being accessed by two separate classes (additions & reading are done by FsTranslog, remove by FsView when closed)
*/ private final Set<FsView> outstandingViews = ConcurrentCollections.newConcurrentSet();
long currentId();
/**
* Returns the number of operations in the transaction files that aren't committed to lucene..
* Note: may return -1 if unknown
*/
int totalOperations();
/** class ApplySettings implements IndexSettingsService.Listener {
* Returns the size in bytes of the translog files that aren't committed to lucene. @Override
*/ public void onRefreshSettings(Settings settings) {
long sizeInBytes(); TranslogFile.Type type = TranslogFile.Type.fromString(settings.get(INDEX_TRANSLOG_FS_TYPE, Translog.this.type.name()));
if (type != Translog.this.type) {
logger.info("updating type from [{}] to [{}]", Translog.this.type, type);
Translog.this.type = type;
}
}
}
/** private final IndexSettingsService indexSettingsService;
* Creates a new transaction log file internally. That new file will be visible to all outstanding views. private final BigArrays bigArrays;
* The id of the new translog file is returned. private final ThreadPool threadPool;
*/
long newTranslog() throws TranslogException, IOException;
/** protected final ReleasableLock readLock;
* Adds a create operation to the transaction log. protected final ReleasableLock writeLock;
*/
Location add(Operation operation) throws TranslogException;
Translog.Operation read(Location location); private final Path location;
/** // protected by the write lock
* Snapshots the current transaction log allowing to safely iterate over the snapshot. private long idGenerator = 1;
* Snapshots are fixed in time and will not be updated with future operations. private TranslogFile current;
*/ // ordered by age
Snapshot newSnapshot() throws TranslogException; private final List<ChannelImmutableReader> uncommittedTranslogs = new ArrayList<>();
private long lastCommittedTranslogId = -1; // -1 is safe as it will not cause an translog deletion.
/** private TranslogFile.Type type;
* Returns a view into the current translog that is guaranteed to retain all current operations
* while receiving future ones as well
*/
View newView();
/** private boolean syncOnEachOperation = false;
* Sync's the translog.
*/
void sync() throws IOException;
boolean syncNeeded(); private volatile int bufferSize;
void syncOnEachOperation(boolean syncOnEachOperation); private final ApplySettings applySettings = new ApplySettings();
private final AtomicBoolean closed = new AtomicBoolean();
public Translog(ShardId shardId, IndexSettingsService indexSettingsService,
BigArrays bigArrays, Path location, ThreadPool threadPool) throws IOException {
this(shardId, indexSettingsService.getSettings(), indexSettingsService, bigArrays, location, threadPool);
}
public Translog(ShardId shardId, @IndexSettings Settings indexSettings,
BigArrays bigArrays, Path location) throws IOException {
this(shardId, indexSettings, null, bigArrays, location, null);
}
private Translog(ShardId shardId, @IndexSettings Settings indexSettings, @Nullable IndexSettingsService indexSettingsService,
BigArrays bigArrays, Path location, @Nullable ThreadPool threadPool) throws IOException {
super(shardId, indexSettings);
ReadWriteLock rwl = new ReentrantReadWriteLock();
readLock = new ReleasableLock(rwl.readLock());
writeLock = new ReleasableLock(rwl.writeLock());
this.indexSettingsService = indexSettingsService;
this.bigArrays = bigArrays;
this.location = location;
Files.createDirectories(this.location);
this.threadPool = threadPool;
this.type = TranslogFile.Type.fromString(indexSettings.get(INDEX_TRANSLOG_FS_TYPE, TranslogFile.Type.BUFFERED.name()));
this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, ByteSizeValue.parseBytesSizeValue("64k")).bytes(); // Not really interesting, updated by IndexingMemoryController...
syncInterval = indexSettings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
if (syncInterval.millis() > 0 && threadPool != null) {
syncOnEachOperation(false);
syncScheduler = threadPool.schedule(syncInterval, ThreadPool.Names.SAME, new Sync());
} else if (syncInterval.millis() == 0) {
syncOnEachOperation(true);
}
if (indexSettingsService != null) {
indexSettingsService.addListener(applySettings);
}
try {
recoverFromFiles();
// now that we know which files are there, create a new current one.
current = createTranslogFile(null);
} catch (Throwable t) {
// close the opened translog files if we fail to create a new translog...
IOUtils.closeWhileHandlingException(uncommittedTranslogs);
throw t;
}
}
/** recover all translog files found on disk */
private void recoverFromFiles() throws IOException {
boolean success = false;
ArrayList<ChannelImmutableReader> foundTranslogs = new ArrayList<>();
try (ReleasableLock lock = writeLock.acquire()) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(location, TRANSLOG_FILE_PREFIX + "[0-9]*")) {
for (Path file : stream) {
final long id = parseIdFromFileName(file);
if (id < 0) {
throw new TranslogException(shardId, "failed to parse id from file name matching pattern " + file);
}
idGenerator = Math.max(idGenerator, id + 1);
final ChannelReference raf = new InternalChannelReference(id, location.resolve(getFilename(id)), StandardOpenOption.READ);
foundTranslogs.add(new ChannelImmutableReader(id, raf, raf.channel().size(), ChannelReader.UNKNOWN_OP_COUNT));
logger.debug("found local translog with id [{}]", id);
}
}
CollectionUtil.timSort(foundTranslogs);
uncommittedTranslogs.addAll(foundTranslogs);
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(foundTranslogs);
}
}
}
/* extracts the translog id from a file name. returns -1 upon failure */
public static long parseIdFromFileName(Path translogFile) {
final String fileName = translogFile.getFileName().toString();
final Matcher matcher = PARSE_ID_PATTERN.matcher(fileName);
if (matcher.matches()) {
try {
return Long.parseLong(matcher.group(1));
} catch (NumberFormatException e) {
throw new ElasticsearchException("number formatting issue in a file that passed PARSE_ID_PATTERN: " + fileName + "]", e);
}
}
return -1;
}
public void updateBuffer(ByteSizeValue bufferSize) {
this.bufferSize = bufferSize.bytesAsInt();
try (ReleasableLock lock = writeLock.acquire()) {
current.updateBufferSize(this.bufferSize);
}
}
boolean isOpen() {
return closed.get() == false;
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
if (indexSettingsService != null) {
indexSettingsService.removeListener(applySettings);
}
try (ReleasableLock lock = writeLock.acquire()) {
try {
IOUtils.close(this.current);
} finally {
IOUtils.close(uncommittedTranslogs);
}
} finally {
FutureUtils.cancel(syncScheduler);
logger.debug("translog closed");
}
}
}
/** /**
* Returns all translog locations as absolute paths. * Returns all translog locations as absolute paths.
* These paths don't contain actual translog files they are * These paths don't contain actual translog files they are
* directories holding the transaction logs. * directories holding the transaction logs.
*/ */
public Path location(); public Path location() {
return location;
}
/** /**
* return stats * Returns the id of the current transaction log.
*/ */
TranslogStats stats(); public long currentId() {
try (ReleasableLock lock = readLock.acquire()) {
return current.translogId();
}
}
/**
* Returns the number of operations in the transaction files that aren't committed to lucene..
* Note: may return -1 if unknown
*/
public int totalOperations() {
int ops = 0;
try (ReleasableLock lock = readLock.acquire()) {
ops += current.totalOperations();
for (ChannelReader translog : uncommittedTranslogs) {
int tops = translog.totalOperations();
if (tops == ChannelReader.UNKNOWN_OP_COUNT) {
return ChannelReader.UNKNOWN_OP_COUNT;
}
ops += tops;
}
}
return ops;
}
/**
* Returns the size in bytes of the translog files that aren't committed to lucene.
*/
public long sizeInBytes() {
long size = 0;
try (ReleasableLock lock = readLock.acquire()) {
size += current.sizeInBytes();
for (ChannelReader translog : uncommittedTranslogs) {
size += translog.sizeInBytes();
}
}
return size;
}
/** /**
* notifies the translog that translogId was committed as part of the commit data in lucene, together * notifies the translog that translogId was committed as part of the commit data in lucene, together
@ -123,10 +301,391 @@ public interface Translog extends IndexShardComponent {
* *
* @throws FileNotFoundException if the given translog id can not be found. * @throws FileNotFoundException if the given translog id can not be found.
*/ */
void markCommitted(long translogId) throws FileNotFoundException; public void markCommitted(final long translogId) throws FileNotFoundException {
try (ReleasableLock lock = writeLock.acquire()) {
logger.trace("updating translogs on commit of [{}]", translogId);
if (translogId < lastCommittedTranslogId) {
throw new IllegalArgumentException("committed translog id can only go up (current ["
+ lastCommittedTranslogId + "], got [" + translogId + "]");
}
boolean found = false;
if (current.translogId() == translogId) {
found = true;
} else {
if (translogId > current.translogId()) {
throw new IllegalArgumentException("committed translog id must be lower or equal to current id (current ["
+ current.translogId() + "], got [" + translogId + "]");
}
}
if (found == false) {
// try to find it in uncommittedTranslogs
for (ChannelImmutableReader translog : uncommittedTranslogs) {
if (translog.translogId() == translogId) {
found = true;
break;
}
}
}
if (found == false) {
ArrayList<Long> currentIds = new ArrayList<>();
for (ChannelReader translog : Iterables.concat(uncommittedTranslogs, Collections.singletonList(current))) {
currentIds.add(translog.translogId());
}
throw new FileNotFoundException("committed translog id can not be found (current ["
+ Strings.collectionToCommaDelimitedString(currentIds) + "], got [" + translogId + "]");
}
lastCommittedTranslogId = translogId;
while (uncommittedTranslogs.isEmpty() == false && uncommittedTranslogs.get(0).translogId() < translogId) {
ChannelReader old = uncommittedTranslogs.remove(0);
logger.trace("removed [{}] from uncommitted translog list", old.translogId());
try {
old.close();
} catch (IOException e) {
logger.error("failed to closed old translog [{}] (committed id [{}])", e, old, translogId);
}
}
}
}
/**
* Creates a new transaction log file internally. That new file will be visible to all outstanding views.
* The id of the new translog file is returned.
*/
public long newTranslog() throws TranslogException, IOException {
try (ReleasableLock lock = writeLock.acquire()) {
final TranslogFile old = current;
final TranslogFile newFile = createTranslogFile(old);
current = newFile;
ChannelImmutableReader reader = old.immutableReader();
uncommittedTranslogs.add(reader);
// notify all outstanding views of the new translog (no views are created now as
// we hold a write lock).
for (FsView view : outstandingViews) {
view.onNewTranslog(old.immutableReader(), current.reader());
}
IOUtils.close(old);
logger.trace("current translog set to [{}]", current.translogId());
return current.translogId();
}
}
protected TranslogFile createTranslogFile(@Nullable TranslogFile reuse) throws IOException {
TranslogFile newFile;
long size = Long.MAX_VALUE;
try {
long id = idGenerator++;
newFile = type.create(shardId, id, new InternalChannelReference(id, location.resolve(getFilename(id)), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW), bufferSize);
} catch (IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
}
if (reuse != null) {
newFile.reuse(reuse);
}
return newFile;
}
static class Location implements Accountable { /**
* Read the Operation object from the given location, returns null if the
* Operation could not be read.
*/
public Translog.Operation read(Location location) {
try (ReleasableLock lock = readLock.acquire()) {
ChannelReader reader = null;
if (current.translogId() == location.translogId) {
reader = current;
} else {
for (ChannelReader translog : uncommittedTranslogs) {
if (translog.translogId() == location.translogId) {
reader = translog;
break;
}
}
}
return reader == null ? null : reader.read(location);
} catch (IOException e) {
throw new ElasticsearchException("failed to read source from translog location " + location, e);
}
}
/**
* Adds a create operation to the transaction log.
*/
public Location add(Operation operation) throws TranslogException {
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
try {
TranslogStreams.writeTranslogOperation(out, operation);
ReleasablePagedBytesReference bytes = out.bytes();
try (ReleasableLock lock = readLock.acquire()) {
Location location = current.add(bytes);
if (syncOnEachOperation) {
current.sync();
}
assert current.assertBytesAtLocation(location, bytes);
return location;
}
} catch (Throwable e) {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally {
Releasables.close(out.bytes());
}
}
/**
* Snapshots the current transaction log allowing to safely iterate over the snapshot.
* Snapshots are fixed in time and will not be updated with future operations.
*/
public Snapshot newSnapshot() {
try (ReleasableLock lock = readLock.acquire()) {
// leave one place for current.
final ChannelReader[] readers = uncommittedTranslogs.toArray(new ChannelReader[uncommittedTranslogs.size() + 1]);
readers[readers.length - 1] = current;
return createdSnapshot(readers);
}
}
private Snapshot createdSnapshot(ChannelReader... translogs) {
ArrayList<ChannelSnapshot> channelSnapshots = new ArrayList<>();
boolean success = false;
try {
for (ChannelReader translog : translogs) {
channelSnapshots.add(translog.newSnapshot());
}
Snapshot snapshot = new TranslogSnapshot(channelSnapshots, logger);
success = true;
return snapshot;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(channelSnapshots);
}
}
}
/**
* Returns a view into the current translog that is guaranteed to retain all current operations
* while receiving future ones as well
*/
public Translog.View newView() {
// we need to acquire the read lock to make sure new translog is created
// and will be missed by the view we're making
try (ReleasableLock lock = readLock.acquire()) {
ArrayList<ChannelReader> translogs = new ArrayList<>();
try {
for (ChannelImmutableReader translog : uncommittedTranslogs) {
translogs.add(translog.clone());
}
translogs.add(current.reader());
FsView view = new FsView(translogs);
// this is safe as we know that no new translog is being made at the moment
// (we hold a read lock) and the view will be notified of any future one
outstandingViews.add(view);
translogs.clear();
return view;
} finally {
// close if anything happend and we didn't reach the clear
IOUtils.closeWhileHandlingException(translogs);
}
}
}
/**
* Sync's the translog.
*/
public void sync() throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
if (closed.get()) {
return;
}
current.sync();
}
}
public boolean syncNeeded() {
try (ReleasableLock lock = readLock.acquire()) {
return current.syncNeeded();
}
}
public void syncOnEachOperation(boolean syncOnEachOperation) {
this.syncOnEachOperation = syncOnEachOperation;
if (syncOnEachOperation) {
type = TranslogFile.Type.SIMPLE;
} else {
type = TranslogFile.Type.BUFFERED;
}
}
/** package private for testing */
String getFilename(long translogId) {
return TRANSLOG_FILE_PREFIX + translogId;
}
/**
* return stats
*/
public TranslogStats stats() {
// acquire lock to make the two numbers roughly consistent (no file change half way)
try (ReleasableLock lock = readLock.acquire()) {
return new TranslogStats(totalOperations(), sizeInBytes());
}
}
private boolean isReferencedTranslogId(long translogId) {
return translogId >= lastCommittedTranslogId;
}
private final class InternalChannelReference extends ChannelReference {
final long translogId;
public InternalChannelReference(long translogId, Path file, OpenOption... openOptions) throws IOException {
super(file, openOptions);
this.translogId = translogId;
}
@Override
protected void closeInternal() {
super.closeInternal();
try (ReleasableLock lock = writeLock.acquire()) {
if (isReferencedTranslogId(translogId) == false) {
// if the given path is not the current we can safely delete the file since all references are released
logger.trace("delete translog file - not referenced and not current anymore {}", file());
IOUtils.deleteFilesIgnoringExceptions(file());
}
}
}
}
/**
* a view into the translog, capturing all translog file at the moment of creation
* and updated with any future translog.
*/
class FsView implements View {
boolean closed;
// last in this list is always FsTranslog.current
final List<ChannelReader> orderedTranslogs;
FsView(List<ChannelReader> orderedTranslogs) {
assert orderedTranslogs.isEmpty() == false;
// clone so we can safely mutate..
this.orderedTranslogs = new ArrayList<>(orderedTranslogs);
}
/**
* Called by the parent class when ever the current translog changes
*
* @param oldCurrent a new read only reader for the old current (should replace the previous reference)
* @param newCurrent a reader into the new current.
*/
synchronized void onNewTranslog(ChannelReader oldCurrent, ChannelReader newCurrent) throws IOException {
// even though the close method removes this view from outstandingViews, there is no synchronisation in place
// between that operation and an ongoing addition of a new translog, already having an iterator.
// As such, this method can be called despite of the fact that we are closed. We need to check and ignore.
if (closed) {
// we have to close the new references created for as as we will not hold them
IOUtils.close(oldCurrent, newCurrent);
return;
}
orderedTranslogs.remove(orderedTranslogs.size() - 1).close();
orderedTranslogs.add(oldCurrent);
orderedTranslogs.add(newCurrent);
}
@Override
public synchronized long minTranslogId() {
ensureOpen();
return orderedTranslogs.get(0).translogId();
}
@Override
public synchronized int totalOperations() {
int ops = 0;
for (ChannelReader translog : orderedTranslogs) {
int tops = translog.totalOperations();
if (tops == ChannelReader.UNKNOWN_OP_COUNT) {
return -1;
}
ops += tops;
}
return ops;
}
@Override
public synchronized long sizeInBytes() {
long size = 0;
for (ChannelReader translog : orderedTranslogs) {
size += translog.sizeInBytes();
}
return size;
}
public synchronized Snapshot snapshot() {
ensureOpen();
return createdSnapshot(orderedTranslogs.toArray(new ChannelReader[orderedTranslogs.size()]));
}
void ensureOpen() {
if (closed) {
throw new ElasticsearchException("View is already closed");
}
}
@Override
public void close() {
List<ChannelReader> toClose = new ArrayList<>();
try {
synchronized (this) {
if (closed == false) {
logger.trace("closing view starting at translog [{}]", minTranslogId());
closed = true;
outstandingViews.remove(this);
toClose.addAll(orderedTranslogs);
orderedTranslogs.clear();
}
}
} finally {
try {
// Close out of lock to prevent deadlocks between channel close which checks for
// references in InternalChannelReference.closeInternal (waiting on a read lock)
// and other FsTranslog#newTranslog calling FsView.onNewTranslog (while having a write lock)
IOUtils.close(toClose);
} catch (Exception e) {
throw new ElasticsearchException("failed to close view", e);
}
}
}
}
class Sync implements Runnable {
@Override
public void run() {
// don't re-schedule if its closed..., we are done
if (closed.get()) {
return;
}
if (syncNeeded()) {
threadPool.executor(ThreadPool.Names.FLUSH).execute(new Runnable() {
@Override
public void run() {
try {
sync();
} catch (Exception e) {
logger.warn("failed to sync translog", e);
}
if (closed.get() == false) {
syncScheduler = threadPool.schedule(syncInterval, ThreadPool.Names.SAME, Sync.this);
}
}
});
} else {
syncScheduler = threadPool.schedule(syncInterval, ThreadPool.Names.SAME, Sync.this);
}
}
}
public static class Location implements Accountable {
public final long translogId; public final long translogId;
public final long translogLocation; public final long translogLocation;
@ -157,7 +716,7 @@ public interface Translog extends IndexShardComponent {
/** /**
* A snapshot of the transaction log, allows to iterate over all the transaction log operations. * A snapshot of the transaction log, allows to iterate over all the transaction log operations.
*/ */
static interface Snapshot extends Releasable { public interface Snapshot extends Releasable {
/** /**
* The total number of operations in the translog. * The total number of operations in the translog.
@ -172,7 +731,7 @@ public interface Translog extends IndexShardComponent {
} }
/** a view into the current translog that receives all operations from the moment created */ /** a view into the current translog that receives all operations from the moment created */
interface View extends Releasable { public interface View extends Releasable {
/** /**
* The total number of operations in the view. * The total number of operations in the view.
@ -196,8 +755,8 @@ public interface Translog extends IndexShardComponent {
* A generic interface representing an operation performed on the transaction log. * A generic interface representing an operation performed on the transaction log.
* Each is associated with a type. * Each is associated with a type.
*/ */
static interface Operation extends Streamable { public interface Operation extends Streamable {
static enum Type { enum Type {
CREATE((byte) 1), CREATE((byte) 1),
SAVE((byte) 2), SAVE((byte) 2),
DELETE((byte) 3), DELETE((byte) 3),
@ -237,7 +796,7 @@ public interface Translog extends IndexShardComponent {
} }
static class Source { public static class Source {
public final BytesReference source; public final BytesReference source;
public final String routing; public final String routing;
public final String parent; public final String parent;
@ -253,7 +812,7 @@ public interface Translog extends IndexShardComponent {
} }
} }
static class Create implements Operation { public static class Create implements Operation {
public static final int SERIALIZATION_FORMAT = 6; public static final int SERIALIZATION_FORMAT = 6;
private String id; private String id;
@ -446,7 +1005,7 @@ public interface Translog extends IndexShardComponent {
} }
} }
static class Index implements Operation { public static class Index implements Operation {
public static final int SERIALIZATION_FORMAT = 6; public static final int SERIALIZATION_FORMAT = 6;
private String id; private String id;
@ -641,7 +1200,7 @@ public interface Translog extends IndexShardComponent {
} }
} }
static class Delete implements Operation { public static class Delete implements Operation {
public static final int SERIALIZATION_FORMAT = 2; public static final int SERIALIZATION_FORMAT = 2;
private Term uid; private Term uid;
@ -751,7 +1310,7 @@ public interface Translog extends IndexShardComponent {
/** @deprecated Delete-by-query is removed in 2.0, but we keep this so translog can replay on upgrade. */ /** @deprecated Delete-by-query is removed in 2.0, but we keep this so translog can replay on upgrade. */
@Deprecated @Deprecated
static class DeleteByQuery implements Operation { public static class DeleteByQuery implements Operation {
public static final int SERIALIZATION_FORMAT = 2; public static final int SERIALIZATION_FORMAT = 2;
private BytesReference source; private BytesReference source;

View File

@ -17,27 +17,25 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class FsTranslogFile extends FsChannelReader { public abstract class TranslogFile extends ChannelReader {
protected final ShardId shardId; protected final ShardId shardId;
protected final ReleasableLock readLock; protected final ReleasableLock readLock;
protected final ReleasableLock writeLock; protected final ReleasableLock writeLock;
public FsTranslogFile(ShardId shardId, long id, ChannelReference channelReference) { public TranslogFile(ShardId shardId, long id, ChannelReference channelReference) {
super(id, channelReference); super(id, channelReference);
this.shardId = shardId; this.shardId = shardId;
ReadWriteLock rwl = new ReentrantReadWriteLock(); ReadWriteLock rwl = new ReentrantReadWriteLock();
@ -46,22 +44,22 @@ public abstract class FsTranslogFile extends FsChannelReader {
} }
public static enum Type { public enum Type {
SIMPLE() { SIMPLE() {
@Override @Override
public FsTranslogFile create(ShardId shardId, long id, ChannelReference channelReference, int bufferSize) throws IOException { public TranslogFile create(ShardId shardId, long id, ChannelReference channelReference, int bufferSize) throws IOException {
return new SimpleFsTranslogFile(shardId, id, channelReference); return new SimpleTranslogFile(shardId, id, channelReference);
} }
}, },
BUFFERED() { BUFFERED() {
@Override @Override
public FsTranslogFile create(ShardId shardId, long id, ChannelReference channelReference, int bufferSize) throws IOException { public TranslogFile create(ShardId shardId, long id, ChannelReference channelReference, int bufferSize) throws IOException {
return new BufferingFsTranslogFile(shardId, id, channelReference, bufferSize); return new BufferingTranslogFile(shardId, id, channelReference, bufferSize);
} }
}; };
public abstract FsTranslogFile create(ShardId shardId, long id, ChannelReference raf, int bufferSize) throws IOException; public abstract TranslogFile create(ShardId shardId, long id, ChannelReference raf, int bufferSize) throws IOException;
public static Type fromString(String type) { public static Type fromString(String type) {
if (SIMPLE.name().equalsIgnoreCase(type)) { if (SIMPLE.name().equalsIgnoreCase(type)) {
@ -78,7 +76,7 @@ public abstract class FsTranslogFile extends FsChannelReader {
public abstract Translog.Location add(BytesReference data) throws IOException; public abstract Translog.Location add(BytesReference data) throws IOException;
/** reuse resources from another translog file, which is guaranteed not to be used anymore */ /** reuse resources from another translog file, which is guaranteed not to be used anymore */
public abstract void reuse(FsTranslogFile other) throws TranslogException; public abstract void reuse(TranslogFile other) throws TranslogException;
/** change the size of the internal buffer if relevant */ /** change the size of the internal buffer if relevant */
public abstract void updateBufferSize(int bufferSize) throws TranslogException; public abstract void updateBufferSize(int bufferSize) throws TranslogException;
@ -90,19 +88,19 @@ public abstract class FsTranslogFile extends FsChannelReader {
public abstract boolean syncNeeded(); public abstract boolean syncNeeded();
@Override @Override
public FsChannelSnapshot newSnapshot() { public ChannelSnapshot newSnapshot() {
return new FsChannelSnapshot(immutableReader()); return new ChannelSnapshot(immutableReader());
} }
/** /**
* returns a new reader that follows the current writes (most importantly allows making * returns a new reader that follows the current writes (most importantly allows making
* repeated snapshots that includes new content) * repeated snapshots that includes new content)
*/ */
public FsChannelReader reader() { public ChannelReader reader() {
channelReference.incRef(); channelReference.incRef();
boolean success = false; boolean success = false;
try { try {
FsChannelReader reader = new InnerReader(this.id, channelReference); ChannelReader reader = new InnerReader(this.id, channelReference);
success = true; success = true;
return reader; return reader;
} finally { } finally {
@ -114,7 +112,7 @@ public abstract class FsTranslogFile extends FsChannelReader {
/** returns a new immutable reader which only exposes the current written operation * */ /** returns a new immutable reader which only exposes the current written operation * */
abstract public FsChannelImmutableReader immutableReader(); abstract public ChannelImmutableReader immutableReader();
boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException { boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(location.size); ByteBuffer buffer = ByteBuffer.allocate(location.size);
@ -126,7 +124,7 @@ public abstract class FsTranslogFile extends FsChannelReader {
* this class is used when one wants a reference to this file which exposes all recently written operation. * this class is used when one wants a reference to this file which exposes all recently written operation.
* as such it needs access to the internals of the current reader * as such it needs access to the internals of the current reader
*/ */
final class InnerReader extends FsChannelReader { final class InnerReader extends ChannelReader {
public InnerReader(long id, ChannelReference channelReference) { public InnerReader(long id, ChannelReference channelReference) {
super(id, channelReference); super(id, channelReference);
@ -134,22 +132,22 @@ public abstract class FsTranslogFile extends FsChannelReader {
@Override @Override
public long sizeInBytes() { public long sizeInBytes() {
return FsTranslogFile.this.sizeInBytes(); return TranslogFile.this.sizeInBytes();
} }
@Override @Override
public int totalOperations() { public int totalOperations() {
return FsTranslogFile.this.totalOperations(); return TranslogFile.this.totalOperations();
} }
@Override @Override
protected void readBytes(ByteBuffer buffer, long position) throws IOException { protected void readBytes(ByteBuffer buffer, long position) throws IOException {
FsTranslogFile.this.readBytes(buffer, position); TranslogFile.this.readBytes(buffer, position);
} }
@Override @Override
public FsChannelSnapshot newSnapshot() { public ChannelSnapshot newSnapshot() {
return FsTranslogFile.this.newSnapshot(); return TranslogFile.this.newSnapshot();
} }
} }
} }

View File

@ -17,23 +17,21 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TruncatedTranslogException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public class FsTranslogSnapshot implements Translog.Snapshot { public class TranslogSnapshot implements Translog.Snapshot {
private final List<FsChannelSnapshot> orderedTranslogs; private final List<ChannelSnapshot> orderedTranslogs;
private final ESLogger logger; private final ESLogger logger;
private final ByteBuffer cacheBuffer; private final ByteBuffer cacheBuffer;
private AtomicBoolean closed = new AtomicBoolean(false); private AtomicBoolean closed = new AtomicBoolean(false);
@ -44,15 +42,15 @@ public class FsTranslogSnapshot implements Translog.Snapshot {
* Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point * Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point
* at the end of the last operation in this snapshot. * at the end of the last operation in this snapshot.
*/ */
public FsTranslogSnapshot(List<FsChannelSnapshot> orderedTranslogs, ESLogger logger) { public TranslogSnapshot(List<ChannelSnapshot> orderedTranslogs, ESLogger logger) {
this.orderedTranslogs = orderedTranslogs; this.orderedTranslogs = orderedTranslogs;
this.logger = logger; this.logger = logger;
int ops = 0; int ops = 0;
for (FsChannelSnapshot translog : orderedTranslogs) { for (ChannelSnapshot translog : orderedTranslogs) {
final int tops = translog.estimatedTotalOperations(); final int tops = translog.estimatedTotalOperations();
if (tops < 0) { if (tops < 0) {
ops = FsChannelReader.UNKNOWN_OP_COUNT; ops = ChannelReader.UNKNOWN_OP_COUNT;
break; break;
} }
ops += tops; ops += tops;
@ -72,7 +70,7 @@ public class FsTranslogSnapshot implements Translog.Snapshot {
public Translog.Operation next() throws IOException { public Translog.Operation next() throws IOException {
ensureOpen(); ensureOpen();
for (; currentTranslog < orderedTranslogs.size(); currentTranslog++) { for (; currentTranslog < orderedTranslogs.size(); currentTranslog++) {
final FsChannelSnapshot current = orderedTranslogs.get(currentTranslog); final ChannelSnapshot current = orderedTranslogs.get(currentTranslog);
Translog.Operation op = null; Translog.Operation op = null;
try { try {
op = current.next(cacheBuffer); op = current.next(cacheBuffer);

View File

@ -1,650 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog.fs;
import com.google.common.collect.Iterables;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
*/
public class FsTranslog extends AbstractIndexShardComponent implements Translog, Closeable {
public static final String INDEX_TRANSLOG_FS_TYPE = "index.translog.fs.type";
public static final String INDEX_TRANSLOG_BUFFER_SIZE = "index.translog.fs.buffer_size";
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
public static final String TRANSLOG_FILE_PREFIX = "translog-";
static final Pattern PARSE_ID_PATTERN = Pattern.compile(TRANSLOG_FILE_PREFIX + "(\\d+)(\\.recovering)?$");
private final TimeValue syncInterval;
private volatile ScheduledFuture<?> syncScheduler;
// this is a concurrent set and is not protected by any of the locks. The main reason
// is that is being accessed by two separate classes (additions & reading are done by FsTranslog, remove by FsView when closed)
private final Set<FsView> outstandingViews = ConcurrentCollections.newConcurrentSet();
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
FsTranslogFile.Type type = FsTranslogFile.Type.fromString(settings.get(INDEX_TRANSLOG_FS_TYPE, FsTranslog.this.type.name()));
if (type != FsTranslog.this.type) {
logger.info("updating type from [{}] to [{}]", FsTranslog.this.type, type);
FsTranslog.this.type = type;
}
}
}
private final IndexSettingsService indexSettingsService;
private final BigArrays bigArrays;
private final ThreadPool threadPool;
protected final ReleasableLock readLock;
protected final ReleasableLock writeLock;
private final Path location;
// protected by the write lock
private long idGenerator = 1;
private FsTranslogFile current;
// ordered by age
private final List<FsChannelImmutableReader> uncommittedTranslogs = new ArrayList<>();
private long lastCommittedTranslogId = -1; // -1 is safe as it will not cause an translog deletion.
private FsTranslogFile.Type type;
private boolean syncOnEachOperation = false;
private volatile int bufferSize;
private final ApplySettings applySettings = new ApplySettings();
private final AtomicBoolean closed = new AtomicBoolean();
public FsTranslog(ShardId shardId, IndexSettingsService indexSettingsService,
BigArrays bigArrays, Path location, ThreadPool threadPool) throws IOException {
this(shardId, indexSettingsService.getSettings(), indexSettingsService, bigArrays, location, threadPool);
}
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings,
BigArrays bigArrays, Path location) throws IOException {
this(shardId, indexSettings, null, bigArrays, location, null);
}
private FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, @Nullable IndexSettingsService indexSettingsService,
BigArrays bigArrays, Path location, @Nullable ThreadPool threadPool) throws IOException {
super(shardId, indexSettings);
ReadWriteLock rwl = new ReentrantReadWriteLock();
readLock = new ReleasableLock(rwl.readLock());
writeLock = new ReleasableLock(rwl.writeLock());
this.indexSettingsService = indexSettingsService;
this.bigArrays = bigArrays;
this.location = location;
Files.createDirectories(this.location);
this.threadPool = threadPool;
this.type = FsTranslogFile.Type.fromString(indexSettings.get(INDEX_TRANSLOG_FS_TYPE, FsTranslogFile.Type.BUFFERED.name()));
this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, ByteSizeValue.parseBytesSizeValue("64k")).bytes(); // Not really interesting, updated by IndexingMemoryController...
syncInterval = indexSettings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
if (syncInterval.millis() > 0 && threadPool != null) {
syncOnEachOperation(false);
syncScheduler = threadPool.schedule(syncInterval, ThreadPool.Names.SAME, new Sync());
} else if (syncInterval.millis() == 0) {
syncOnEachOperation(true);
}
if (indexSettingsService != null) {
indexSettingsService.addListener(applySettings);
}
try {
recoverFromFiles();
// now that we know which files are there, create a new current one.
current = createTranslogFile(null);
} catch (Throwable t) {
// close the opened translog files if we fail to create a new translog...
IOUtils.closeWhileHandlingException(uncommittedTranslogs);
throw t;
}
}
/** recover all translog files found on disk */
private void recoverFromFiles() throws IOException {
boolean success = false;
ArrayList<FsChannelImmutableReader> foundTranslogs = new ArrayList<>();
try (ReleasableLock lock = writeLock.acquire()) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(location, TRANSLOG_FILE_PREFIX + "[0-9]*")) {
for (Path file : stream) {
final long id = parseIdFromFileName(file);
if (id < 0) {
throw new TranslogException(shardId, "failed to parse id from file name matching pattern " + file);
}
idGenerator = Math.max(idGenerator, id + 1);
final ChannelReference raf = new InternalChannelReference(id, location.resolve(getFilename(id)), StandardOpenOption.READ);
foundTranslogs.add(new FsChannelImmutableReader(id, raf, raf.channel().size(), FsChannelReader.UNKNOWN_OP_COUNT));
logger.debug("found local translog with id [{}]", id);
}
}
CollectionUtil.timSort(foundTranslogs);
uncommittedTranslogs.addAll(foundTranslogs);
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(foundTranslogs);
}
}
}
/* extracts the translog id from a file name. returns -1 upon failure */
public static long parseIdFromFileName(Path translogFile) {
final String fileName = translogFile.getFileName().toString();
final Matcher matcher = PARSE_ID_PATTERN.matcher(fileName);
if (matcher.matches()) {
try {
return Long.parseLong(matcher.group(1));
} catch (NumberFormatException e) {
throw new ElasticsearchException("number formatting issue in a file that passed PARSE_ID_PATTERN: " + fileName + "]", e);
}
}
return -1;
}
@Override
public void updateBuffer(ByteSizeValue bufferSize) {
this.bufferSize = bufferSize.bytesAsInt();
try (ReleasableLock lock = writeLock.acquire()) {
current.updateBufferSize(this.bufferSize);
}
}
boolean isOpen() {
return closed.get() == false;
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
if (indexSettingsService != null) {
indexSettingsService.removeListener(applySettings);
}
try (ReleasableLock lock = writeLock.acquire()) {
try {
IOUtils.close(this.current);
} finally {
IOUtils.close(uncommittedTranslogs);
}
} finally {
FutureUtils.cancel(syncScheduler);
logger.debug("translog closed");
}
}
}
@Override
public Path location() {
return location;
}
@Override
public long currentId() {
try (ReleasableLock lock = readLock.acquire()) {
return current.translogId();
}
}
@Override
public int totalOperations() {
int ops = 0;
try (ReleasableLock lock = readLock.acquire()) {
ops += current.totalOperations();
for (FsChannelReader translog : uncommittedTranslogs) {
int tops = translog.totalOperations();
if (tops == FsChannelReader.UNKNOWN_OP_COUNT) {
return FsChannelReader.UNKNOWN_OP_COUNT;
}
ops += tops;
}
}
return ops;
}
@Override
public long sizeInBytes() {
long size = 0;
try (ReleasableLock lock = readLock.acquire()) {
size += current.sizeInBytes();
for (FsChannelReader translog : uncommittedTranslogs) {
size += translog.sizeInBytes();
}
}
return size;
}
@Override
public void markCommitted(final long translogId) throws FileNotFoundException {
try (ReleasableLock lock = writeLock.acquire()) {
logger.trace("updating translogs on commit of [{}]", translogId);
if (translogId < lastCommittedTranslogId) {
throw new IllegalArgumentException("committed translog id can only go up (current ["
+ lastCommittedTranslogId + "], got [" + translogId + "]");
}
boolean found = false;
if (current.translogId() == translogId) {
found = true;
} else {
if (translogId > current.translogId()) {
throw new IllegalArgumentException("committed translog id must be lower or equal to current id (current ["
+ current.translogId() + "], got [" + translogId + "]");
}
}
if (found == false) {
// try to find it in uncommittedTranslogs
for (FsChannelImmutableReader translog : uncommittedTranslogs) {
if (translog.translogId() == translogId) {
found = true;
break;
}
}
}
if (found == false) {
ArrayList<Long> currentIds = new ArrayList<>();
for (FsChannelReader translog : Iterables.concat(uncommittedTranslogs, Collections.singletonList(current))) {
currentIds.add(translog.translogId());
}
throw new FileNotFoundException("committed translog id can not be found (current ["
+ Strings.collectionToCommaDelimitedString(currentIds) + "], got [" + translogId + "]");
}
lastCommittedTranslogId = translogId;
while (uncommittedTranslogs.isEmpty() == false && uncommittedTranslogs.get(0).translogId() < translogId) {
FsChannelReader old = uncommittedTranslogs.remove(0);
logger.trace("removed [{}] from uncommitted translog list", old.translogId());
try {
old.close();
} catch (IOException e) {
logger.error("failed to closed old translog [{}] (committed id [{}])", e, old, translogId);
}
}
}
}
@Override
public long newTranslog() throws TranslogException, IOException {
try (ReleasableLock lock = writeLock.acquire()) {
final FsTranslogFile old = current;
final FsTranslogFile newFile = createTranslogFile(old);
current = newFile;
FsChannelImmutableReader reader = old.immutableReader();
uncommittedTranslogs.add(reader);
// notify all outstanding views of the new translog (no views are created now as
// we hold a write lock).
for (FsView view : outstandingViews) {
view.onNewTranslog(old.immutableReader(), current.reader());
}
IOUtils.close(old);
logger.trace("current translog set to [{}]", current.translogId());
return current.translogId();
}
}
protected FsTranslogFile createTranslogFile(@Nullable FsTranslogFile reuse) throws IOException {
FsTranslogFile newFile;
long size = Long.MAX_VALUE;
try {
long id = idGenerator++;
newFile = type.create(shardId, id, new InternalChannelReference(id, location.resolve(getFilename(id)), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW), bufferSize);
} catch (IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
}
if (reuse != null) {
newFile.reuse(reuse);
}
return newFile;
}
/**
* Read the Operation object from the given location, returns null if the
* Operation could not be read.
*/
@Override
public Translog.Operation read(Location location) {
try (ReleasableLock lock = readLock.acquire()) {
FsChannelReader reader = null;
if (current.translogId() == location.translogId) {
reader = current;
} else {
for (FsChannelReader translog : uncommittedTranslogs) {
if (translog.translogId() == location.translogId) {
reader = translog;
break;
}
}
}
return reader == null ? null : reader.read(location);
} catch (IOException e) {
throw new ElasticsearchException("failed to read source from translog location " + location, e);
}
}
@Override
public Location add(Operation operation) throws TranslogException {
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
try {
TranslogStreams.writeTranslogOperation(out, operation);
ReleasablePagedBytesReference bytes = out.bytes();
try (ReleasableLock lock = readLock.acquire()) {
Location location = current.add(bytes);
if (syncOnEachOperation) {
current.sync();
}
assert current.assertBytesAtLocation(location, bytes);
return location;
}
} catch (Throwable e) {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally {
Releasables.close(out.bytes());
}
}
@Override
public Snapshot newSnapshot() {
try (ReleasableLock lock = readLock.acquire()) {
// leave one place for current.
final FsChannelReader[] readers = uncommittedTranslogs.toArray(new FsChannelReader[uncommittedTranslogs.size() + 1]);
readers[readers.length - 1] = current;
return createdSnapshot(readers);
}
}
private Snapshot createdSnapshot(FsChannelReader... translogs) {
ArrayList<FsChannelSnapshot> channelSnapshots = new ArrayList<>();
boolean success = false;
try {
for (FsChannelReader translog : translogs) {
channelSnapshots.add(translog.newSnapshot());
}
Snapshot snapshot = new FsTranslogSnapshot(channelSnapshots, logger);
success = true;
return snapshot;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(channelSnapshots);
}
}
}
@Override
public Translog.View newView() {
// we need to acquire the read lock to make sure new translog is created
// and will be missed by the view we're making
try (ReleasableLock lock = readLock.acquire()) {
ArrayList<FsChannelReader> translogs = new ArrayList<>();
try {
for (FsChannelImmutableReader translog : uncommittedTranslogs) {
translogs.add(translog.clone());
}
translogs.add(current.reader());
FsView view = new FsView(translogs);
// this is safe as we know that no new translog is being made at the moment
// (we hold a read lock) and the view will be notified of any future one
outstandingViews.add(view);
translogs.clear();
return view;
} finally {
// close if anything happend and we didn't reach the clear
IOUtils.closeWhileHandlingException(translogs);
}
}
}
@Override
public void sync() throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
if (closed.get()) {
return;
}
current.sync();
}
}
@Override
public boolean syncNeeded() {
try (ReleasableLock lock = readLock.acquire()) {
return current.syncNeeded();
}
}
@Override
public void syncOnEachOperation(boolean syncOnEachOperation) {
this.syncOnEachOperation = syncOnEachOperation;
if (syncOnEachOperation) {
type = FsTranslogFile.Type.SIMPLE;
} else {
type = FsTranslogFile.Type.BUFFERED;
}
}
/** package private for testing */
String getFilename(long translogId) {
return TRANSLOG_FILE_PREFIX + translogId;
}
@Override
public TranslogStats stats() {
// acquire lock to make the two numbers roughly consistent (no file change half way)
try (ReleasableLock lock = readLock.acquire()) {
return new TranslogStats(totalOperations(), sizeInBytes());
}
}
private boolean isReferencedTranslogId(long translogId) {
return translogId >= lastCommittedTranslogId;
}
private final class InternalChannelReference extends ChannelReference {
final long translogId;
public InternalChannelReference(long translogId, Path file, OpenOption... openOptions) throws IOException {
super(file, openOptions);
this.translogId = translogId;
}
@Override
protected void closeInternal() {
super.closeInternal();
try (ReleasableLock lock = writeLock.acquire()) {
if (isReferencedTranslogId(translogId) == false) {
// if the given path is not the current we can safely delete the file since all references are released
logger.trace("delete translog file - not referenced and not current anymore {}", file());
IOUtils.deleteFilesIgnoringExceptions(file());
}
}
}
}
/**
* a view into the translog, capturing all translog file at the moment of creation
* and updated with any future translog.
*/
class FsView implements View {
boolean closed;
// last in this list is always FsTranslog.current
final List<FsChannelReader> orderedTranslogs;
FsView(List<FsChannelReader> orderedTranslogs) {
assert orderedTranslogs.isEmpty() == false;
// clone so we can safely mutate..
this.orderedTranslogs = new ArrayList<>(orderedTranslogs);
}
/**
* Called by the parent class when ever the current translog changes
*
* @param oldCurrent a new read only reader for the old current (should replace the previous reference)
* @param newCurrent a reader into the new current.
*/
synchronized void onNewTranslog(FsChannelReader oldCurrent, FsChannelReader newCurrent) throws IOException {
// even though the close method removes this view from outstandingViews, there is no synchronisation in place
// between that operation and an ongoing addition of a new translog, already having an iterator.
// As such, this method can be called despite of the fact that we are closed. We need to check and ignore.
if (closed) {
// we have to close the new references created for as as we will not hold them
IOUtils.close(oldCurrent, newCurrent);
return;
}
orderedTranslogs.remove(orderedTranslogs.size() - 1).close();
orderedTranslogs.add(oldCurrent);
orderedTranslogs.add(newCurrent);
}
@Override
public synchronized long minTranslogId() {
ensureOpen();
return orderedTranslogs.get(0).translogId();
}
@Override
public synchronized int totalOperations() {
int ops = 0;
for (FsChannelReader translog : orderedTranslogs) {
int tops = translog.totalOperations();
if (tops == FsChannelReader.UNKNOWN_OP_COUNT) {
return -1;
}
ops += tops;
}
return ops;
}
@Override
public synchronized long sizeInBytes() {
long size = 0;
for (FsChannelReader translog : orderedTranslogs) {
size += translog.sizeInBytes();
}
return size;
}
public synchronized Snapshot snapshot() {
ensureOpen();
return createdSnapshot(orderedTranslogs.toArray(new FsChannelReader[orderedTranslogs.size()]));
}
void ensureOpen() {
if (closed) {
throw new ElasticsearchException("View is already closed");
}
}
@Override
public void close() {
List<FsChannelReader> toClose = new ArrayList<>();
try {
synchronized (this) {
if (closed == false) {
logger.trace("closing view starting at translog [{}]", minTranslogId());
closed = true;
outstandingViews.remove(this);
toClose.addAll(orderedTranslogs);
orderedTranslogs.clear();
}
}
} finally {
try {
// Close out of lock to prevent deadlocks between channel close which checks for
// references in InternalChannelReference.closeInternal (waiting on a read lock)
// and other FsTranslog#newTranslog calling FsView.onNewTranslog (while having a write lock)
IOUtils.close(toClose);
} catch (Exception e) {
throw new ElasticsearchException("failed to close view", e);
}
}
}
}
class Sync implements Runnable {
@Override
public void run() {
// don't re-schedule if its closed..., we are done
if (closed.get()) {
return;
}
if (syncNeeded()) {
threadPool.executor(ThreadPool.Names.FLUSH).execute(new Runnable() {
@Override
public void run() {
try {
sync();
} catch (Exception e) {
logger.warn("failed to sync translog", e);
}
if (closed.get() == false) {
syncScheduler = threadPool.schedule(syncInterval, ThreadPool.Names.SAME, Sync.this);
}
}
});
} else {
syncScheduler = threadPool.schedule(syncInterval, ThreadPool.Names.SAME, Sync.this);
}
}
}
}

View File

@ -79,7 +79,6 @@ import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils; import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
@ -216,15 +215,15 @@ public class InternalEngineTests extends ElasticsearchTestCase {
return new Store(shardId, EMPTY_SETTINGS, directoryService, new DummyShardLock(shardId)); return new Store(shardId, EMPTY_SETTINGS, directoryService, new DummyShardLock(shardId));
} }
protected FsTranslog createTranslog() throws IOException { protected Translog createTranslog() throws IOException {
return createTranslog(primaryTranslogDir); return createTranslog(primaryTranslogDir);
} }
protected FsTranslog createTranslog(Path translogPath) throws IOException { protected Translog createTranslog(Path translogPath) throws IOException {
return new FsTranslog(shardId, EMPTY_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, translogPath); return new Translog(shardId, EMPTY_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, translogPath);
} }
protected FsTranslog createTranslogReplica() throws IOException { protected Translog createTranslogReplica() throws IOException {
return createTranslog(replicaTranslogDir); return createTranslog(replicaTranslogDir);
} }
@ -1546,7 +1545,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
// test that we can force start the engine , even if the translog is missing. // test that we can force start the engine , even if the translog is missing.
engine.close(); engine.close();
// fake a new translog, causing the engine to point to a missing one. // fake a new translog, causing the engine to point to a missing one.
FsTranslog translog = createTranslog(); Translog translog = createTranslog();
translog.markCommitted(translog.currentId()); translog.markCommitted(translog.currentId());
// we have to re-open the translog because o.w. it will complain about commit information going backwards, which is OK as we did a fake markComitted // we have to re-open the translog because o.w. it will complain about commit information going backwards, which is OK as we did a fake markComitted
translog.close(); translog.close();

View File

@ -63,7 +63,6 @@ import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils; import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
@ -32,7 +32,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.*;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.After; import org.junit.After;
@ -64,7 +63,7 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase {
protected final ShardId shardId = new ShardId(new Index("index"), 1); protected final ShardId shardId = new ShardId(new Index("index"), 1);
protected FsTranslog translog; protected Translog translog;
protected Path translogDir; protected Path translogDir;
@Override @Override
@ -103,7 +102,7 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase {
} }
protected abstract FsTranslog create() throws IOException; protected abstract Translog create() throws IOException;
protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) { protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) {
@ -114,24 +113,24 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase {
public void testIdParsingFromFile() { public void testIdParsingFromFile() {
long id = randomIntBetween(0, Integer.MAX_VALUE); long id = randomIntBetween(0, Integer.MAX_VALUE);
Path file = translogDir.resolve(FsTranslog.TRANSLOG_FILE_PREFIX + id); Path file = translogDir.resolve(Translog.TRANSLOG_FILE_PREFIX + id);
assertThat(FsTranslog.parseIdFromFileName(file), equalTo(id)); assertThat(Translog.parseIdFromFileName(file), equalTo(id));
file = translogDir.resolve(FsTranslog.TRANSLOG_FILE_PREFIX + id + ".recovering"); file = translogDir.resolve(Translog.TRANSLOG_FILE_PREFIX + id + ".recovering");
assertThat(FsTranslog.parseIdFromFileName(file), equalTo(id)); assertThat(Translog.parseIdFromFileName(file), equalTo(id));
file = translogDir.resolve(FsTranslog.TRANSLOG_FILE_PREFIX + randomNonTranslogPatternString(1, 10) + id); file = translogDir.resolve(Translog.TRANSLOG_FILE_PREFIX + randomNonTranslogPatternString(1, 10) + id);
assertThat(FsTranslog.parseIdFromFileName(file), equalTo(-1l)); assertThat(Translog.parseIdFromFileName(file), equalTo(-1l));
file = translogDir.resolve(randomNonTranslogPatternString(1, FsTranslog.TRANSLOG_FILE_PREFIX.length() - 1)); file = translogDir.resolve(randomNonTranslogPatternString(1, Translog.TRANSLOG_FILE_PREFIX.length() - 1));
assertThat(FsTranslog.parseIdFromFileName(file), equalTo(-1l)); assertThat(Translog.parseIdFromFileName(file), equalTo(-1l));
} }
private static String randomNonTranslogPatternString(int min, int max) { private static String randomNonTranslogPatternString(int min, int max) {
String string; String string;
do { do {
string = randomRealisticUnicodeOfCodepointLength(randomIntBetween(min, max)); string = randomRealisticUnicodeOfCodepointLength(randomIntBetween(min, max));
} while (FsTranslog.PARSE_ID_PATTERN.matcher(string).matches()); } while (Translog.PARSE_ID_PATTERN.matcher(string).matches());
return string; return string;
} }
@ -361,14 +360,14 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase {
} }
public void assertFileIsPresent(FsTranslog translog, long id) { public void assertFileIsPresent(Translog translog, long id) {
if (Files.exists(translogDir.resolve(translog.getFilename(id)))) { if (Files.exists(translogDir.resolve(translog.getFilename(id)))) {
return; return;
} }
fail(translog.getFilename(id) + " is not present in any location: " + translog.location()); fail(translog.getFilename(id) + " is not present in any location: " + translog.location());
} }
public void assertFileDeleted(FsTranslog translog, long id) { public void assertFileDeleted(Translog translog, long id) {
assertFalse("translog [" + id + "] still exists", Files.exists(translog.location().resolve(translog.getFilename(id)))); assertFalse("translog [" + id + "] still exists", Files.exists(translog.location().resolve(translog.getFilename(id))));
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
@ -32,10 +32,10 @@ import java.io.IOException;
public class FsBufferedTranslogTests extends AbstractTranslogTests { public class FsBufferedTranslogTests extends AbstractTranslogTests {
@Override @Override
protected FsTranslog create() throws IOException { protected Translog create() throws IOException {
return new FsTranslog(shardId, return new Translog(shardId,
ImmutableSettings.settingsBuilder() ImmutableSettings.settingsBuilder()
.put("index.translog.fs.type", FsTranslogFile.Type.BUFFERED.name()) .put("index.translog.fs.type", TranslogFile.Type.BUFFERED.name())
.put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024)) .put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024))
.build(), .build(),
BigArrays.NON_RECYCLING_INSTANCE, translogDir BigArrays.NON_RECYCLING_INSTANCE, translogDir

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
@ -32,9 +32,9 @@ import java.io.IOException;
public class FsSimpleTranslogTests extends AbstractTranslogTests { public class FsSimpleTranslogTests extends AbstractTranslogTests {
@Override @Override
protected FsTranslog create() throws IOException { protected Translog create() throws IOException {
return new FsTranslog(shardId, return new Translog(shardId,
ImmutableSettings.settingsBuilder().put("index.translog.fs.type", FsTranslogFile.Type.SIMPLE.name()).build(), ImmutableSettings.settingsBuilder().put("index.translog.fs.type", TranslogFile.Type.SIMPLE.name()).build(),
BigArrays.NON_RECYCLING_INSTANCE, translogDir); BigArrays.NON_RECYCLING_INSTANCE, translogDir);
} }
} }

View File

@ -109,10 +109,9 @@ import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider; import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule; import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule;
import org.elasticsearch.index.store.StoreModule;
import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.fs.FsTranslogFile; import org.elasticsearch.index.translog.TranslogFile;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@ -505,7 +504,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
} }
if (random.nextBoolean()) { if (random.nextBoolean()) {
builder.put(FsTranslog.INDEX_TRANSLOG_FS_TYPE, RandomPicks.randomFrom(random, FsTranslogFile.Type.values()).name()); builder.put(Translog.INDEX_TRANSLOG_FS_TYPE, RandomPicks.randomFrom(random, TranslogFile.Type.values()).name());
} }
if (random.nextBoolean()) { if (random.nextBoolean()) {

View File

@ -21,7 +21,6 @@ package org.elasticsearch.test.engine;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.translog.fs.FsTranslog;
/** /**
* *

View File

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.translog.fs.FsTranslog;
import java.io.IOException; import java.io.IOException;