Fix visibility in buffered translog

- fix visiblity of last written position in translog
- while there, make sure to properly propagate the exception from sync()
This commit is contained in:
Shay Banon 2014-03-30 19:51:53 +02:00
parent a809bfbcb2
commit f424319f9a
5 changed files with 63 additions and 46 deletions

View File

@ -126,7 +126,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
/**
* Sync's the translog.
*/
void sync();
void sync() throws IOException;
boolean syncNeeded();

View File

@ -27,6 +27,7 @@ import org.elasticsearch.index.translog.TranslogException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -39,10 +40,11 @@ public class BufferingFsTranslogFile implements FsTranslogFile {
private final RafReference raf;
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private final AtomicBoolean closed = new AtomicBoolean();
private volatile int operationCounter;
private long lastPosition;
private volatile long lastPosition;
private volatile long lastWrittenPosition;
private volatile long lastSyncPosition = 0;
@ -98,6 +100,7 @@ public class BufferingFsTranslogFile implements FsTranslogFile {
}
private void flushBuffer() throws IOException {
assert (((ReentrantReadWriteLock.WriteLock) rwl.writeLock()).isHeldByCurrentThread());
if (bufferCount > 0) {
// we use the channel to write, since on windows, writing to the RAF might not be reflected
// when reading through the channel
@ -146,40 +149,36 @@ public class BufferingFsTranslogFile implements FsTranslogFile {
}
@Override
public void sync() {
try {
// check if we really need to sync here...
long last = lastPosition;
if (last == lastSyncPosition) {
return;
}
lastSyncPosition = last;
rwl.writeLock().lock();
try {
flushBuffer();
} finally {
rwl.writeLock().unlock();
}
raf.channel().force(false);
} catch (Exception e) {
// ignore
public void sync() throws IOException {
if (!syncNeeded()) {
return;
}
rwl.writeLock().lock();
try {
flushBuffer();
lastSyncPosition = lastPosition;
} finally {
rwl.writeLock().unlock();
}
raf.channel().force(false);
}
@Override
public void close(boolean delete) {
if (!delete) {
rwl.writeLock().lock();
try {
flushBuffer();
sync();
} catch (IOException e) {
throw new TranslogException(shardId, "failed to close", e);
} finally {
rwl.writeLock().unlock();
}
if (!closed.compareAndSet(false, true)) {
return;
}
try {
if (!delete) {
try {
sync();
} catch (Exception e) {
throw new TranslogException(shardId, "failed to sync on close", e);
}
}
} finally {
raf.decreaseRefCount(delete);
}
raf.decreaseRefCount(delete);
}
@Override

View File

@ -391,12 +391,20 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
}
@Override
public void sync() {
public void sync() throws IOException {
FsTranslogFile current1 = this.current;
if (current1 == null) {
return;
}
current1.sync();
try {
current1.sync();
} catch (IOException e) {
// if we switches translots (!=), then this failure is not relevant
// we are working on a new translog
if (this.current == current1) {
throw e;
}
}
}
@Override

View File

@ -74,7 +74,7 @@ public interface FsTranslogFile {
void updateBufferSize(int bufferSize) throws TranslogException;
void sync();
void sync() throws IOException;
boolean syncNeeded();
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.index.translog.TranslogException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -34,6 +35,7 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
private final long id;
private final ShardId shardId;
private final RafReference raf;
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger operationCounter = new AtomicInteger();
@ -76,8 +78,20 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
}
public void close(boolean delete) {
sync();
raf.decreaseRefCount(delete);
if (!closed.compareAndSet(false, true)) {
return;
}
try {
if (!delete) {
try {
sync();
} catch (Exception e) {
throw new TranslogException(shardId, "failed to sync on close", e);
}
}
} finally {
raf.decreaseRefCount(delete);
}
}
/**
@ -99,18 +113,14 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
return lastWrittenPosition.get() != lastSyncPosition;
}
public void sync() {
try {
// check if we really need to sync here...
long last = lastWrittenPosition.get();
if (last == lastSyncPosition) {
return;
}
lastSyncPosition = last;
raf.channel().force(false);
} catch (Exception e) {
// ignore
public void sync() throws IOException {
// check if we really need to sync here...
long last = lastWrittenPosition.get();
if (last == lastSyncPosition) {
return;
}
lastSyncPosition = last;
raf.channel().force(false);
}
@Override