diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 73494dcc39d..53e79f11676 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -843,8 +843,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { indexWriter.commit(MapBuilder.newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map()); translog.makeTransientCurrent(); } catch (Exception e) { + translog.revertTransient(); throw new FlushFailedEngineException(shardId, e); } catch (OutOfMemoryError e) { + translog.revertTransient(); failEngine(e); throw new FlushFailedEngineException(shardId, e); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java index c739af308e7..768a1b26dd5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -87,6 +87,11 @@ public interface Translog extends IndexShardComponent { */ void makeTransientCurrent(); + /** + * Reverts back to not have a transient translog. + */ + void revertTransient(); + /** * Adds a create operation to the transaction log. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index a9617b18ebd..582cf5163c2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -35,12 +35,15 @@ import org.elasticsearch.index.translog.TranslogStreams; import java.io.File; import java.io.IOException; import java.nio.channels.ClosedChannelException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @author kimchy (shay.banon) */ public class FsTranslog extends AbstractIndexShardComponent implements Translog { + private final ReadWriteLock rwl = new ReentrantReadWriteLock(); private final File location; private volatile FsTranslogFile current; @@ -93,74 +96,118 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } @Override public void clearUnreferenced() { - File[] files = location.listFiles(); - if (files != null) { - for (File file : files) { - if (file.getName().equals("translog-" + current.id())) { - continue; - } - try { - file.delete(); - } catch (Exception e) { - // ignore + rwl.writeLock().lock(); + try { + File[] files = location.listFiles(); + if (files != null) { + for (File file : files) { + if (file.getName().equals("translog-" + current.id())) { + continue; + } + if (trans != null && file.getName().equals("translog-" + trans.id())) { + continue; + } + try { + file.delete(); + } catch (Exception e) { + // ignore + } } } + } finally { + rwl.writeLock().unlock(); } } @Override public void newTranslog(long id) throws TranslogException { - FsTranslogFile newFile; + rwl.writeLock().lock(); try { - newFile = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); - } catch (IOException e) { - throw new TranslogException(shardId, "failed to create new translog file", e); - } - FsTranslogFile old = current; - current = newFile; - if (old != null) { - // we might create a new translog overriding the current translog id - boolean delete = true; - if (old.id() == id) { - delete = false; + FsTranslogFile newFile; + try { + newFile = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); + } catch (IOException e) { + throw new TranslogException(shardId, "failed to create new translog file", e); } - old.close(delete); + FsTranslogFile old = current; + current = newFile; + if (old != null) { + // we might create a new translog overriding the current translog id + boolean delete = true; + if (old.id() == id) { + delete = false; + } + old.close(delete); + } + } finally { + rwl.writeLock().unlock(); } } @Override public void newTransientTranslog(long id) throws TranslogException { + rwl.writeLock().lock(); try { + assert this.trans == null; this.trans = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); } catch (IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); + } finally { + rwl.writeLock().unlock(); } } @Override public void makeTransientCurrent() { - this.current = this.trans; - this.trans = null; + FsTranslogFile old; + rwl.writeLock().lock(); + try { + assert this.trans != null; + old = current; + this.current = this.trans; + this.trans = null; + } finally { + rwl.writeLock().unlock(); + } + old.close(true); + } + + @Override public void revertTransient() { + FsTranslogFile old; + rwl.writeLock().lock(); + try { + old = trans; + this.trans = null; + } finally { + rwl.writeLock().unlock(); + } + old.close(true); } public byte[] read(Location location) { - FsTranslogFile trans = this.trans; - if (trans != null && trans.id() == location.translogId) { - try { - return trans.read(location); - } catch (Exception e) { - // ignore + rwl.readLock().lock(); + try { + FsTranslogFile trans = this.trans; + if (trans != null && trans.id() == location.translogId) { + try { + return trans.read(location); + } catch (Exception e) { + // ignore + } } - } - if (current.id() == location.translogId) { - try { - return current.read(location); - } catch (IOException e) { - // ignore + if (current.id() == location.translogId) { + try { + return current.read(location); + } catch (Exception e) { + // ignore + } } + return null; + } finally { + rwl.readLock().unlock(); } - return null; } @Override public Location add(Operation operation) throws TranslogException { CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + rwl.readLock().lock(); try { BytesStreamOutput out = cachedEntry.cachedBytes(); out.writeInt(0); // marker for the size... @@ -187,6 +234,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } catch (Exception e) { throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); } finally { + rwl.readLock().unlock(); CachedStreamOutput.pushEntry(cachedEntry); } } @@ -222,13 +270,18 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } @Override public void close(boolean delete) { - FsTranslogFile current1 = this.current; - if (current1 != null) { - current1.close(delete); - } - current1 = this.trans; - if (current1 != null) { - current1.close(delete); + rwl.writeLock().lock(); + try { + FsTranslogFile current1 = this.current; + if (current1 != null) { + current1.close(delete); + } + current1 = this.trans; + if (current1 != null) { + current1.close(delete); + } + } finally { + rwl.writeLock().unlock(); } } }