apply feedback from @bleskes

This commit is contained in:
Simon Willnauer 2015-12-14 14:36:20 +01:00
parent 2d03a6b808
commit c6003b6f13
3 changed files with 42 additions and 22 deletions

View File

@ -61,13 +61,13 @@ public final class BufferingTranslogWriter extends TranslogWriter {
}
writtenOffset += data.length();
totalOffset += data.length();
return new Translog.Location(generation, offset, data.length());
} else {
if (data.length() > buffer.length - bufferCount) {
flush();
}
data.writeTo(bufferOs);
totalOffset += data.length();
}
if (data.length() > buffer.length - bufferCount) {
flush();
}
data.writeTo(bufferOs);
totalOffset += data.length();
operationCounter++;
return new Translog.Location(generation, offset, data.length());
}
@ -109,29 +109,28 @@ public final class BufferingTranslogWriter extends TranslogWriter {
}
@Override
public synchronized boolean syncNeeded() {
public boolean syncNeeded() {
return totalOffset != lastSyncedOffset;
}
@Override
public void sync() throws IOException {
if (!syncNeeded()) {
return;
}
synchronized (this) {
ensureOpen();
public synchronized void sync() throws IOException {
if (syncNeeded()) {
ensureOpen(); // this call gives a better exception that the incRef if we are closed by a tragic event
channelReference.incRef();
try {
final long offsetToSync;
final int opsCounter;
try (ReleasableLock lock = writeLock.acquire()) {
flush();
offsetToSync = totalOffset;
opsCounter = operationCounter;
}
// we can do this outside of the write lock but we have to protect from
// concurrent syncs
try {
ensureOpen();
checkpoint(offsetToSync, operationCounter, channelReference);
ensureOpen(); // just for kicks - the checkpoint happens or not either way
checkpoint(offsetToSync, opsCounter, channelReference);
} catch (IOException ex) {
closeWithTragicEvent(ex);
throw ex;

View File

@ -55,7 +55,8 @@ public class TranslogWriter extends TranslogReader {
protected volatile int operationCounter;
/* the offset in bytes written to the file */
protected volatile long writtenOffset;
protected volatile Throwable tragicEvent;
/* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */
private volatile Throwable tragicEvent;
public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference) throws IOException {
@ -163,13 +164,15 @@ public class TranslogWriter extends TranslogReader {
/**
* write all buffered ops to disk and fsync file
*/
public void sync() throws IOException {
public synchronized void sync() throws IOException { // synchronized to ensure only one sync happens a time
// check if we really need to sync here...
if (syncNeeded()) {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
lastSyncedOffset = writtenOffset;
checkpoint(lastSyncedOffset, operationCounter, channelReference);
final long offset = writtenOffset;
final int opsCount = operationCounter;
checkpoint(offset, opsCount, channelReference);
lastSyncedOffset = offset;
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.lucene.mockfile.FilterFileChannel;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -1328,7 +1329,6 @@ public class TranslogTests extends ESTestCase {
int opsSynced = 0;
int opsAdded = 0;
boolean failed = false;
boolean syncFailed = true;
while(failed == false) {
try {
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
@ -1338,7 +1338,8 @@ public class TranslogTests extends ESTestCase {
} catch (IOException ex) {
failed = true;
assertEquals("no space left on device", ex.getMessage());
} catch (Exception ex) {
} catch (TranslogException ex) {
// we catch IOExceptions in Translog#add -- that's how we got here
failed = true;
assertTrue(ex.toString(), ex.getMessage().startsWith("Failed to write operation"));
}
@ -1348,7 +1349,7 @@ public class TranslogTests extends ESTestCase {
if (randomBoolean()) {
try {
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
opsSynced++;
fail("we are already closed");
} catch (AlreadyClosedException ex) {
assertNotNull(ex.getCause());
assertEquals(ex.getCause().getMessage(), "no space left on device");
@ -1387,4 +1388,21 @@ public class TranslogTests extends ESTestCase {
}
}
}
public void testTranslogOpsCountIsCorrect() throws IOException {
List<Translog.Location> locations = new ArrayList<>();
int numOps = randomIntBetween(100, 200);
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
locations.add(translog.add(new Translog.Index("test", "" + opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))));
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertEquals(opsAdded+1, snapshot.estimatedTotalOperations());
for (int i = 0; i < opsAdded; i++) {
assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), locations.get(i).generation);
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
}
}
}
}
}