Internal: harden recovery for old segments

When a lucene 4.8+ file is transferred, Store returns a VerifyingIndexOutput
that verifies both the CRC32 integrity and the length of the file.

However, for older files, problems can make it to the lucene level. This is not great
since older lucene files aren't especially strong as far as detecting issues here.

For example, if a network transfer is closed on the remote side, we might write a
truncated file... which old lucene formats may or may not detect.

The idea here is to verify old files with their legacy Adler32 checksum, plus expected
length. If they don't have an Adler32 (segments_N, jurassic elasticsearch?, its optional
as far as the protocol goes), then at least check the length.

We could improve it for segments_N, its had an embedded CRC32 forever in lucene, but this
gets trickier. Long term, we should also try to also improve tests around here, especially
backwards compat testing, we should test that detected corruptions are handled properly.

Closes #8399

Conflicts:
	src/main/java/org/elasticsearch/index/store/Store.java
	src/test/java/org/elasticsearch/index/store/StoreTest.java
This commit is contained in:
Robert Muir 2014-11-09 03:17:26 -05:00
parent f47fb6b1cf
commit 0eb3402795
5 changed files with 329 additions and 31 deletions

View File

@ -0,0 +1,123 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.store.IndexOutput;
import java.io.IOException;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
/**
* Implements verification checks to the best extent possible
* against legacy segments.
* <p>
* For files since ES 1.3, we have a lucene checksum, and
* we verify both CRC32 + length from that.
* For older segment files, we have an elasticsearch Adler32 checksum
* and a length, except for commit points.
* For older commit points, we only have the length in metadata,
* but lucene always wrote a CRC32 checksum we can verify in the future, too.
* For (Jurassic?) files, we dont have an Adler32 checksum at all,
* since its optional in the protocol. But we always know the length.
* @deprecated only to support old segments
*/
@Deprecated
class LegacyVerification {
// TODO: add a verifier for old lucene segments_N that also checks CRC.
// but for now, at least truncation is detected here (as length will be checked)
/**
* verifies Adler32 + length for index files before lucene 4.8
*/
static class Adler32VerifyingIndexOutput extends VerifyingIndexOutput {
final String adler32;
final long length;
final Checksum checksum = new BufferedChecksum(new Adler32());
long written;
public Adler32VerifyingIndexOutput(IndexOutput out, String adler32, long length) {
super(out);
this.adler32 = adler32;
this.length = length;
}
@Override
public void verify() throws IOException {
if (written != length) {
throw new CorruptIndexException("expected length=" + length + " != actual length: " + written + " : file truncated?", out.toString());
}
final String actualChecksum = Store.digestToString(checksum.getValue());
if (!adler32.equals(actualChecksum)) {
throw new CorruptIndexException("checksum failed (hardware problem?) : expected=" + adler32 +
" actual=" + actualChecksum, out.toString());
}
}
@Override
public void writeByte(byte b) throws IOException {
out.writeByte(b);
checksum.update(b);
written++;
}
@Override
public void writeBytes(byte[] bytes, int offset, int length) throws IOException {
out.writeBytes(bytes, offset, length);
checksum.update(bytes, offset, length);
written += length;
}
}
/**
* verifies length for index files before lucene 4.8
*/
static class LengthVerifyingIndexOutput extends VerifyingIndexOutput {
final long length;
long written;
public LengthVerifyingIndexOutput(IndexOutput out, long length) {
super(out);
this.length = length;
}
@Override
public void verify() throws IOException {
if (written != length) {
throw new CorruptIndexException("expected length=" + length + " != actual length: " + written + " : file truncated?", out.toString());
}
}
@Override
public void writeByte(byte b) throws IOException {
out.writeByte(b);
written++;
}
@Override
public void writeBytes(byte[] bytes, int offset, int length) throws IOException {
out.writeBytes(bytes, offset, length);
written += length;
}
}
}

View File

@ -386,12 +386,18 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
IndexOutput output = directory().createOutput(fileName, context); IndexOutput output = directory().createOutput(fileName, context);
boolean success = false; boolean success = false;
try { try {
if (metadata.hasLegacyChecksum() || metadata.checksum() == null) { if (metadata.hasLegacyChecksum()) {
logger.debug("create legacy output for {}", fileName); logger.debug("create legacy adler32 output for {}", fileName);
output = new LegacyVerification.Adler32VerifyingIndexOutput(output, metadata.checksum(), metadata.length());
} else if (metadata.checksum() == null) {
// TODO: when the file is a segments_N, we can still CRC-32 + length for more safety
// its had that checksum forever.
logger.debug("create legacy length-only output for {}", fileName);
output = new LegacyVerification.LengthVerifyingIndexOutput(output, metadata.length());
} else { } else {
assert metadata.writtenBy() != null; assert metadata.writtenBy() != null;
assert metadata.writtenBy().onOrAfter(Version.LUCENE_4_8_0); assert metadata.writtenBy().onOrAfter(Version.LUCENE_4_8);
output = new VerifyingIndexOutput(metadata, output); output = new LuceneVerifyingIndexOutput(metadata, output);
} }
success = true; success = true;
} finally { } finally {
@ -855,39 +861,20 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
} }
static class VerifyingIndexOutput extends IndexOutput { static class LuceneVerifyingIndexOutput extends VerifyingIndexOutput {
private final StoreFileMetaData metadata; private final StoreFileMetaData metadata;
private final IndexOutput output;
private long writtenBytes; private long writtenBytes;
private final long checksumPosition; private final long checksumPosition;
private String actualChecksum; private String actualChecksum;
VerifyingIndexOutput(StoreFileMetaData metadata, IndexOutput actualOutput) { LuceneVerifyingIndexOutput(StoreFileMetaData metadata, IndexOutput out) {
super(out);
this.metadata = metadata; this.metadata = metadata;
this.output = actualOutput;
checksumPosition = metadata.length() - 8; // the last 8 bytes are the checksum checksumPosition = metadata.length() - 8; // the last 8 bytes are the checksum
} }
@Override @Override
public void close() throws IOException {
output.close();
}
@Override
public long getFilePointer() {
return output.getFilePointer();
}
@Override
public long getChecksum() throws IOException {
return output.getChecksum();
}
/**
* Verifies the checksum and compares the written length with the expected file length. This method should bec
* called after all data has been written to this output.
*/
public void verify() throws IOException { public void verify() throws IOException {
if (metadata.checksum().equals(actualChecksum) && writtenBytes == metadata.length()) { if (metadata.checksum().equals(actualChecksum) && writtenBytes == metadata.length()) {
return; return;
@ -902,7 +889,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
if (writtenBytes++ == checksumPosition) { if (writtenBytes++ == checksumPosition) {
readAndCompareChecksum(); readAndCompareChecksum();
} }
output.writeByte(b); out.writeByte(b);
} }
private void readAndCompareChecksum() throws IOException { private void readAndCompareChecksum() throws IOException {
@ -919,13 +906,13 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
if (writtenBytes + length > checksumPosition && actualChecksum == null) { if (writtenBytes + length > checksumPosition && actualChecksum == null) {
assert writtenBytes <= checksumPosition; assert writtenBytes <= checksumPosition;
final int bytesToWrite = (int)(checksumPosition-writtenBytes); final int bytesToWrite = (int)(checksumPosition-writtenBytes);
output.writeBytes(b, offset, bytesToWrite); out.writeBytes(b, offset, bytesToWrite);
readAndCompareChecksum(); readAndCompareChecksum();
offset += bytesToWrite; offset += bytesToWrite;
length -= bytesToWrite; length -= bytesToWrite;
writtenBytes += bytesToWrite; writtenBytes += bytesToWrite;
} }
output.writeBytes(b, offset, length); out.writeBytes(b, offset, length);
writtenBytes += length; writtenBytes += length;
} }

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store;
import java.io.IOException;
import org.apache.lucene.store.IndexOutput;
/**
* abstract class for verifying what was written.
* subclasses override {@link #writeByte(byte)} and {@link #writeBytes(byte[], int, int)}
*/
// do NOT optimize this class for performance
public abstract class VerifyingIndexOutput extends IndexOutput {
protected final IndexOutput out;
/** Sole constructor */
VerifyingIndexOutput(IndexOutput out) {
this.out = out;
}
/**
* Verifies the checksum and compares the written length with the expected file length. This method should be
* called after all data has been written to this output.
*/
public abstract void verify() throws IOException;
// default implementations... forwarding to delegate
@Override
public final void close() throws IOException {
out.close();
}
@Override
public final long getChecksum() throws IOException {
return out.getChecksum();
}
@Override
public final long getFilePointer() {
return out.getFilePointer();
}
}

View File

@ -114,7 +114,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
indexInput.seek(0); indexInput.seek(0);
BytesRef ref = new BytesRef(scaledRandomIntBetween(1, 1024)); BytesRef ref = new BytesRef(scaledRandomIntBetween(1, 1024));
long length = indexInput.length(); long length = indexInput.length();
IndexOutput verifyingOutput = new Store.VerifyingIndexOutput(new StoreFileMetaData("foo1.bar", length, checksum), dir.createOutput("foo1.bar", IOContext.DEFAULT)); IndexOutput verifyingOutput = new Store.LuceneVerifyingIndexOutput(new StoreFileMetaData("foo1.bar", length, checksum), dir.createOutput("foo1.bar", IOContext.DEFAULT));
while (length > 0) { while (length > 0) {
if (random().nextInt(10) == 0) { if (random().nextInt(10) == 0) {
verifyingOutput.writeByte(indexInput.readByte()); verifyingOutput.writeByte(indexInput.readByte());
@ -141,7 +141,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
public void testVerifyingIndexOutputWithBogusInput() throws IOException { public void testVerifyingIndexOutputWithBogusInput() throws IOException {
Directory dir = newDirectory(); Directory dir = newDirectory();
int length = scaledRandomIntBetween(10, 1024); int length = scaledRandomIntBetween(10, 1024);
IndexOutput verifyingOutput = new Store.VerifyingIndexOutput(new StoreFileMetaData("foo1.bar", length, ""), dir.createOutput("foo1.bar", IOContext.DEFAULT)); IndexOutput verifyingOutput = new Store.LuceneVerifyingIndexOutput(new StoreFileMetaData("foo1.bar", length, ""), dir.createOutput("foo1.bar", IOContext.DEFAULT));
try { try {
while (length > 0) { while (length > 0) {
verifyingOutput.writeByte((byte) random().nextInt()); verifyingOutput.writeByte((byte) random().nextInt());

View File

@ -0,0 +1,127 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store;
import java.nio.charset.StandardCharsets;
import java.util.zip.Adler32;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.Directory;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
/**
* Simple tests for LegacyVerification (old segments)
* @deprecated remove this test when support for lucene 4.x
* segments is not longer needed.
*/
@Deprecated
public class TestLegacyVerification extends ElasticsearchLuceneTestCase {
public void testAdler32() throws Exception {
Adler32 expected = new Adler32();
byte bytes[] = "abcdefgh".getBytes(StandardCharsets.UTF_8);
expected.update(bytes);
String expectedString = Store.digestToString(expected.getValue());
Directory dir = newDirectory();
IndexOutput o = dir.createOutput("legacy", IOContext.DEFAULT);
VerifyingIndexOutput out = new LegacyVerification.Adler32VerifyingIndexOutput(o, expectedString, 8);
out.writeBytes(bytes, 0, bytes.length);
out.verify();
out.close();
out.verify();
dir.close();
}
public void testAdler32Corrupt() throws Exception {
Adler32 expected = new Adler32();
byte bytes[] = "abcdefgh".getBytes(StandardCharsets.UTF_8);
expected.update(bytes);
String expectedString = Store.digestToString(expected.getValue());
byte corruptBytes[] = "abcdefch".getBytes(StandardCharsets.UTF_8);
Directory dir = newDirectory();
IndexOutput o = dir.createOutput("legacy", IOContext.DEFAULT);
VerifyingIndexOutput out = new LegacyVerification.Adler32VerifyingIndexOutput(o, expectedString, 8);
out.writeBytes(corruptBytes, 0, bytes.length);
try {
out.verify();
fail();
} catch (CorruptIndexException e) {
// expected exception
}
out.close();
try {
out.verify();
fail();
} catch (CorruptIndexException e) {
// expected exception
}
dir.close();
}
public void testLengthOnlyOneByte() throws Exception {
Directory dir = newDirectory();
IndexOutput o = dir.createOutput("oneByte", IOContext.DEFAULT);
VerifyingIndexOutput out = new LegacyVerification.LengthVerifyingIndexOutput(o, 1);
out.writeByte((byte) 3);
out.verify();
out.close();
out.verify();
dir.close();
}
public void testLengthOnlyCorrupt() throws Exception {
Directory dir = newDirectory();
IndexOutput o = dir.createOutput("oneByte", IOContext.DEFAULT);
VerifyingIndexOutput out = new LegacyVerification.LengthVerifyingIndexOutput(o, 2);
out.writeByte((byte) 3);
try {
out.verify();
fail();
} catch (CorruptIndexException expected) {
// expected exception
}
out.close();
try {
out.verify();
fail();
} catch (CorruptIndexException expected) {
// expected exception
}
dir.close();
}
}