diff --git a/src/main/java/org/elasticsearch/index/store/LegacyVerification.java b/src/main/java/org/elasticsearch/index/store/LegacyVerification.java new file mode 100644 index 00000000000..d2b0a2815e9 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/store/LegacyVerification.java @@ -0,0 +1,123 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store; + +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.BufferedChecksum; +import org.apache.lucene.store.IndexOutput; + +import java.io.IOException; +import java.util.zip.Adler32; +import java.util.zip.Checksum; + +/** + * Implements verification checks to the best extent possible + * against legacy segments. + *
+ * For files since ES 1.3, we have a lucene checksum, and + * we verify both CRC32 + length from that. + * For older segment files, we have an elasticsearch Adler32 checksum + * and a length, except for commit points. + * For older commit points, we only have the length in metadata, + * but lucene always wrote a CRC32 checksum we can verify in the future, too. + * For (Jurassic?) files, we dont have an Adler32 checksum at all, + * since its optional in the protocol. But we always know the length. + * @deprecated only to support old segments + */ +@Deprecated +class LegacyVerification { + + // TODO: add a verifier for old lucene segments_N that also checks CRC. + // but for now, at least truncation is detected here (as length will be checked) + + /** + * verifies Adler32 + length for index files before lucene 4.8 + */ + static class Adler32VerifyingIndexOutput extends VerifyingIndexOutput { + final String adler32; + final long length; + final Checksum checksum = new BufferedChecksum(new Adler32()); + long written; + + public Adler32VerifyingIndexOutput(IndexOutput out, String adler32, long length) { + super(out); + this.adler32 = adler32; + this.length = length; + } + + @Override + public void verify() throws IOException { + if (written != length) { + throw new CorruptIndexException("expected length=" + length + " != actual length: " + written + " : file truncated?", out.toString()); + } + final String actualChecksum = Store.digestToString(checksum.getValue()); + if (!adler32.equals(actualChecksum)) { + throw new CorruptIndexException("checksum failed (hardware problem?) : expected=" + adler32 + + " actual=" + actualChecksum, out.toString()); + } + } + + @Override + public void writeByte(byte b) throws IOException { + out.writeByte(b); + checksum.update(b); + written++; + } + + @Override + public void writeBytes(byte[] bytes, int offset, int length) throws IOException { + out.writeBytes(bytes, offset, length); + checksum.update(bytes, offset, length); + written += length; + } + } + + /** + * verifies length for index files before lucene 4.8 + */ + static class LengthVerifyingIndexOutput extends VerifyingIndexOutput { + final long length; + long written; + + public LengthVerifyingIndexOutput(IndexOutput out, long length) { + super(out); + this.length = length; + } + + @Override + public void verify() throws IOException { + if (written != length) { + throw new CorruptIndexException("expected length=" + length + " != actual length: " + written + " : file truncated?", out.toString()); + } + } + + @Override + public void writeByte(byte b) throws IOException { + out.writeByte(b); + written++; + } + + @Override + public void writeBytes(byte[] bytes, int offset, int length) throws IOException { + out.writeBytes(bytes, offset, length); + written += length; + } + } +} diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 75e8c95edf5..5305095b6b5 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -386,12 +386,18 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex IndexOutput output = directory().createOutput(fileName, context); boolean success = false; try { - if (metadata.hasLegacyChecksum() || metadata.checksum() == null) { - logger.debug("create legacy output for {}", fileName); + if (metadata.hasLegacyChecksum()) { + logger.debug("create legacy adler32 output for {}", fileName); + output = new LegacyVerification.Adler32VerifyingIndexOutput(output, metadata.checksum(), metadata.length()); + } else if (metadata.checksum() == null) { + // TODO: when the file is a segments_N, we can still CRC-32 + length for more safety + // its had that checksum forever. + logger.debug("create legacy length-only output for {}", fileName); + output = new LegacyVerification.LengthVerifyingIndexOutput(output, metadata.length()); } else { assert metadata.writtenBy() != null; - assert metadata.writtenBy().onOrAfter(Version.LUCENE_4_8_0); - output = new VerifyingIndexOutput(metadata, output); + assert metadata.writtenBy().onOrAfter(Version.LUCENE_4_8); + output = new LuceneVerifyingIndexOutput(metadata, output); } success = true; } finally { @@ -855,39 +861,20 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex } - static class VerifyingIndexOutput extends IndexOutput { + static class LuceneVerifyingIndexOutput extends VerifyingIndexOutput { private final StoreFileMetaData metadata; - private final IndexOutput output; private long writtenBytes; private final long checksumPosition; private String actualChecksum; - VerifyingIndexOutput(StoreFileMetaData metadata, IndexOutput actualOutput) { + LuceneVerifyingIndexOutput(StoreFileMetaData metadata, IndexOutput out) { + super(out); this.metadata = metadata; - this.output = actualOutput; checksumPosition = metadata.length() - 8; // the last 8 bytes are the checksum } @Override - public void close() throws IOException { - output.close(); - } - - @Override - public long getFilePointer() { - return output.getFilePointer(); - } - - @Override - public long getChecksum() throws IOException { - return output.getChecksum(); - } - - /** - * Verifies the checksum and compares the written length with the expected file length. This method should bec - * called after all data has been written to this output. - */ public void verify() throws IOException { if (metadata.checksum().equals(actualChecksum) && writtenBytes == metadata.length()) { return; @@ -902,7 +889,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex if (writtenBytes++ == checksumPosition) { readAndCompareChecksum(); } - output.writeByte(b); + out.writeByte(b); } private void readAndCompareChecksum() throws IOException { @@ -919,13 +906,13 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex if (writtenBytes + length > checksumPosition && actualChecksum == null) { assert writtenBytes <= checksumPosition; final int bytesToWrite = (int)(checksumPosition-writtenBytes); - output.writeBytes(b, offset, bytesToWrite); + out.writeBytes(b, offset, bytesToWrite); readAndCompareChecksum(); offset += bytesToWrite; length -= bytesToWrite; writtenBytes += bytesToWrite; } - output.writeBytes(b, offset, length); + out.writeBytes(b, offset, length); writtenBytes += length; } diff --git a/src/main/java/org/elasticsearch/index/store/VerifyingIndexOutput.java b/src/main/java/org/elasticsearch/index/store/VerifyingIndexOutput.java new file mode 100644 index 00000000000..90ae940d0b4 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/store/VerifyingIndexOutput.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store; + +import java.io.IOException; + +import org.apache.lucene.store.IndexOutput; + +/** + * abstract class for verifying what was written. + * subclasses override {@link #writeByte(byte)} and {@link #writeBytes(byte[], int, int)} + */ +// do NOT optimize this class for performance +public abstract class VerifyingIndexOutput extends IndexOutput { + protected final IndexOutput out; + + /** Sole constructor */ + VerifyingIndexOutput(IndexOutput out) { + this.out = out; + } + + /** + * Verifies the checksum and compares the written length with the expected file length. This method should be + * called after all data has been written to this output. + */ + public abstract void verify() throws IOException; + + // default implementations... forwarding to delegate + + @Override + public final void close() throws IOException { + out.close(); + } + + @Override + public final long getChecksum() throws IOException { + return out.getChecksum(); + } + + @Override + public final long getFilePointer() { + return out.getFilePointer(); + } +} diff --git a/src/test/java/org/elasticsearch/index/store/StoreTest.java b/src/test/java/org/elasticsearch/index/store/StoreTest.java index 73d07faf895..fe4090fe72b 100644 --- a/src/test/java/org/elasticsearch/index/store/StoreTest.java +++ b/src/test/java/org/elasticsearch/index/store/StoreTest.java @@ -114,7 +114,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { indexInput.seek(0); BytesRef ref = new BytesRef(scaledRandomIntBetween(1, 1024)); long length = indexInput.length(); - IndexOutput verifyingOutput = new Store.VerifyingIndexOutput(new StoreFileMetaData("foo1.bar", length, checksum), dir.createOutput("foo1.bar", IOContext.DEFAULT)); + IndexOutput verifyingOutput = new Store.LuceneVerifyingIndexOutput(new StoreFileMetaData("foo1.bar", length, checksum), dir.createOutput("foo1.bar", IOContext.DEFAULT)); while (length > 0) { if (random().nextInt(10) == 0) { verifyingOutput.writeByte(indexInput.readByte()); @@ -141,7 +141,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { public void testVerifyingIndexOutputWithBogusInput() throws IOException { Directory dir = newDirectory(); int length = scaledRandomIntBetween(10, 1024); - IndexOutput verifyingOutput = new Store.VerifyingIndexOutput(new StoreFileMetaData("foo1.bar", length, ""), dir.createOutput("foo1.bar", IOContext.DEFAULT)); + IndexOutput verifyingOutput = new Store.LuceneVerifyingIndexOutput(new StoreFileMetaData("foo1.bar", length, ""), dir.createOutput("foo1.bar", IOContext.DEFAULT)); try { while (length > 0) { verifyingOutput.writeByte((byte) random().nextInt()); diff --git a/src/test/java/org/elasticsearch/index/store/TestLegacyVerification.java b/src/test/java/org/elasticsearch/index/store/TestLegacyVerification.java new file mode 100644 index 00000000000..ee1d8b09306 --- /dev/null +++ b/src/test/java/org/elasticsearch/index/store/TestLegacyVerification.java @@ -0,0 +1,127 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store; + +import java.nio.charset.StandardCharsets; +import java.util.zip.Adler32; + +import org.apache.lucene.index.CorruptIndexException; + +import org.apache.lucene.store.IndexOutput; + +import org.apache.lucene.store.IOContext; + +import org.apache.lucene.store.Directory; +import org.elasticsearch.test.ElasticsearchLuceneTestCase; + +/** + * Simple tests for LegacyVerification (old segments) + * @deprecated remove this test when support for lucene 4.x + * segments is not longer needed. + */ +@Deprecated +public class TestLegacyVerification extends ElasticsearchLuceneTestCase { + + public void testAdler32() throws Exception { + Adler32 expected = new Adler32(); + byte bytes[] = "abcdefgh".getBytes(StandardCharsets.UTF_8); + expected.update(bytes); + String expectedString = Store.digestToString(expected.getValue()); + + Directory dir = newDirectory(); + + IndexOutput o = dir.createOutput("legacy", IOContext.DEFAULT); + VerifyingIndexOutput out = new LegacyVerification.Adler32VerifyingIndexOutput(o, expectedString, 8); + out.writeBytes(bytes, 0, bytes.length); + out.verify(); + out.close(); + out.verify(); + + dir.close(); + } + + public void testAdler32Corrupt() throws Exception { + Adler32 expected = new Adler32(); + byte bytes[] = "abcdefgh".getBytes(StandardCharsets.UTF_8); + expected.update(bytes); + String expectedString = Store.digestToString(expected.getValue()); + + byte corruptBytes[] = "abcdefch".getBytes(StandardCharsets.UTF_8); + Directory dir = newDirectory(); + + IndexOutput o = dir.createOutput("legacy", IOContext.DEFAULT); + VerifyingIndexOutput out = new LegacyVerification.Adler32VerifyingIndexOutput(o, expectedString, 8); + out.writeBytes(corruptBytes, 0, bytes.length); + try { + out.verify(); + fail(); + } catch (CorruptIndexException e) { + // expected exception + } + out.close(); + + try { + out.verify(); + fail(); + } catch (CorruptIndexException e) { + // expected exception + } + + dir.close(); + } + + public void testLengthOnlyOneByte() throws Exception { + Directory dir = newDirectory(); + + IndexOutput o = dir.createOutput("oneByte", IOContext.DEFAULT); + VerifyingIndexOutput out = new LegacyVerification.LengthVerifyingIndexOutput(o, 1); + out.writeByte((byte) 3); + out.verify(); + out.close(); + out.verify(); + + dir.close(); + } + + public void testLengthOnlyCorrupt() throws Exception { + Directory dir = newDirectory(); + + IndexOutput o = dir.createOutput("oneByte", IOContext.DEFAULT); + VerifyingIndexOutput out = new LegacyVerification.LengthVerifyingIndexOutput(o, 2); + out.writeByte((byte) 3); + try { + out.verify(); + fail(); + } catch (CorruptIndexException expected) { + // expected exception + } + + out.close(); + + try { + out.verify(); + fail(); + } catch (CorruptIndexException expected) { + // expected exception + } + + dir.close(); + } +}