When flushing, old transaction log is not removed, closes #1180.

This commit is contained in:
Shay Banon 2011-07-29 19:51:01 +03:00
parent 297a496998
commit 91f97bb7b6
3 changed files with 105 additions and 45 deletions

View File

@ -843,8 +843,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
indexWriter.commit(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map()); indexWriter.commit(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map());
translog.makeTransientCurrent(); translog.makeTransientCurrent();
} catch (Exception e) { } catch (Exception e) {
translog.revertTransient();
throw new FlushFailedEngineException(shardId, e); throw new FlushFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) { } catch (OutOfMemoryError e) {
translog.revertTransient();
failEngine(e); failEngine(e);
throw new FlushFailedEngineException(shardId, e); throw new FlushFailedEngineException(shardId, e);
} }

View File

@ -87,6 +87,11 @@ public interface Translog extends IndexShardComponent {
*/ */
void makeTransientCurrent(); void makeTransientCurrent();
/**
* Reverts back to not have a transient translog.
*/
void revertTransient();
/** /**
* Adds a create operation to the transaction log. * Adds a create operation to the transaction log.
*/ */

View File

@ -35,12 +35,15 @@ import org.elasticsearch.index.translog.TranslogStreams;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class FsTranslog extends AbstractIndexShardComponent implements Translog { public class FsTranslog extends AbstractIndexShardComponent implements Translog {
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private final File location; private final File location;
private volatile FsTranslogFile current; private volatile FsTranslogFile current;
@ -93,12 +96,17 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
} }
@Override public void clearUnreferenced() { @Override public void clearUnreferenced() {
rwl.writeLock().lock();
try {
File[] files = location.listFiles(); File[] files = location.listFiles();
if (files != null) { if (files != null) {
for (File file : files) { for (File file : files) {
if (file.getName().equals("translog-" + current.id())) { if (file.getName().equals("translog-" + current.id())) {
continue; continue;
} }
if (trans != null && file.getName().equals("translog-" + trans.id())) {
continue;
}
try { try {
file.delete(); file.delete();
} catch (Exception e) { } catch (Exception e) {
@ -106,9 +114,14 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
} }
} }
} }
} finally {
rwl.writeLock().unlock();
}
} }
@Override public void newTranslog(long id) throws TranslogException { @Override public void newTranslog(long id) throws TranslogException {
rwl.writeLock().lock();
try {
FsTranslogFile newFile; FsTranslogFile newFile;
try { try {
newFile = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); newFile = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id)));
@ -125,22 +138,52 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
} }
old.close(delete); old.close(delete);
} }
} finally {
rwl.writeLock().unlock();
}
} }
@Override public void newTransientTranslog(long id) throws TranslogException { @Override public void newTransientTranslog(long id) throws TranslogException {
rwl.writeLock().lock();
try { try {
assert this.trans == null;
this.trans = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); this.trans = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id)));
} catch (IOException e) { } catch (IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e); throw new TranslogException(shardId, "failed to create new translog file", e);
} finally {
rwl.writeLock().unlock();
} }
} }
@Override public void makeTransientCurrent() { @Override public void makeTransientCurrent() {
FsTranslogFile old;
rwl.writeLock().lock();
try {
assert this.trans != null;
old = current;
this.current = this.trans; this.current = this.trans;
this.trans = null; this.trans = null;
} finally {
rwl.writeLock().unlock();
}
old.close(true);
}
@Override public void revertTransient() {
FsTranslogFile old;
rwl.writeLock().lock();
try {
old = trans;
this.trans = null;
} finally {
rwl.writeLock().unlock();
}
old.close(true);
} }
public byte[] read(Location location) { public byte[] read(Location location) {
rwl.readLock().lock();
try {
FsTranslogFile trans = this.trans; FsTranslogFile trans = this.trans;
if (trans != null && trans.id() == location.translogId) { if (trans != null && trans.id() == location.translogId) {
try { try {
@ -152,15 +195,19 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
if (current.id() == location.translogId) { if (current.id() == location.translogId) {
try { try {
return current.read(location); return current.read(location);
} catch (IOException e) { } catch (Exception e) {
// ignore // ignore
} }
} }
return null; return null;
} finally {
rwl.readLock().unlock();
}
} }
@Override public Location add(Operation operation) throws TranslogException { @Override public Location add(Operation operation) throws TranslogException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
rwl.readLock().lock();
try { try {
BytesStreamOutput out = cachedEntry.cachedBytes(); BytesStreamOutput out = cachedEntry.cachedBytes();
out.writeInt(0); // marker for the size... out.writeInt(0); // marker for the size...
@ -187,6 +234,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
} catch (Exception e) { } catch (Exception e) {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally { } finally {
rwl.readLock().unlock();
CachedStreamOutput.pushEntry(cachedEntry); CachedStreamOutput.pushEntry(cachedEntry);
} }
} }
@ -222,6 +270,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
} }
@Override public void close(boolean delete) { @Override public void close(boolean delete) {
rwl.writeLock().lock();
try {
FsTranslogFile current1 = this.current; FsTranslogFile current1 = this.current;
if (current1 != null) { if (current1 != null) {
current1.close(delete); current1.close(delete);
@ -230,5 +280,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
if (current1 != null) { if (current1 != null) {
current1.close(delete); current1.close(delete);
} }
} finally {
rwl.writeLock().unlock();
}
} }
} }