fix a bug in new checksum mechanism that caused for replicas not to retain the _checksums file. Also, now that checksums are widely used, consider files without checksums as ones that need to be recovered
This commit is contained in:
parent
64a89eb0e4
commit
953a99c75c
|
@ -41,29 +41,6 @@ import static org.elasticsearch.common.io.FileSystemUtils.*;
|
|||
*/
|
||||
public class Directories {
|
||||
|
||||
/**
|
||||
* Deletes all the files from a directory.
|
||||
*
|
||||
* @param directory The directory to delete all the files from
|
||||
* @throws IOException if an exception occurs during the delete process
|
||||
*/
|
||||
public static void deleteFiles(Directory directory) throws IOException {
|
||||
String[] files = directory.listAll();
|
||||
IOException lastException = null;
|
||||
for (String file : files) {
|
||||
try {
|
||||
directory.deleteFile(file);
|
||||
} catch (FileNotFoundException e) {
|
||||
// ignore
|
||||
} catch (IOException e) {
|
||||
lastException = e;
|
||||
}
|
||||
}
|
||||
if (lastException != null) {
|
||||
throw lastException;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the estimated size of a {@link Directory}.
|
||||
*/
|
||||
|
|
|
@ -62,10 +62,10 @@ public class CommitPoint {
|
|||
}
|
||||
|
||||
public boolean isSame(StoreFileMetaData md) {
|
||||
if (checksum != null && md.checksum() != null) {
|
||||
return checksum.equals(md.checksum());
|
||||
if (checksum == null || md.checksum() == null) {
|
||||
return false;
|
||||
}
|
||||
return length == md.length();
|
||||
return length == md.length() && checksum.equals(md.checksum());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -93,7 +93,8 @@ public class RecoverySource extends AbstractComponent {
|
|||
this.translogSize = componentSettings.getAsBytesSize("translog_size", new ByteSizeValue(100, ByteSizeUnit.KB));
|
||||
this.compress = componentSettings.getAsBoolean("compress", true);
|
||||
|
||||
logger.debug("using concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]", concurrentStreams, fileChunkSize, translogOps, translogSize, compress);
|
||||
logger.debug("using concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]",
|
||||
concurrentStreams, fileChunkSize, translogSize, translogOps, compress);
|
||||
|
||||
transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler());
|
||||
}
|
||||
|
|
|
@ -66,10 +66,10 @@ public class StoreFileMetaData implements Streamable {
|
|||
}
|
||||
|
||||
public boolean isSame(StoreFileMetaData other) {
|
||||
if (checksum != null && other.checksum != null) {
|
||||
return checksum.equals(other.checksum);
|
||||
if (checksum == null || other.checksum == null) {
|
||||
return false;
|
||||
}
|
||||
return length == other.length;
|
||||
return length == other.length && checksum.equals(other.checksum);
|
||||
}
|
||||
|
||||
public static StoreFileMetaData readStoreFileMetaData(StreamInput in) throws IOException {
|
||||
|
|
|
@ -46,6 +46,8 @@ import java.util.zip.Checksum;
|
|||
*/
|
||||
public abstract class AbstractStore extends AbstractIndexShardComponent implements Store {
|
||||
|
||||
static final String CHECKSUMS_PREFIX = "_checksums-";
|
||||
|
||||
protected final IndexStore indexStore;
|
||||
|
||||
private volatile ImmutableMap<String, StoreFileMetaData> filesMetadata = ImmutableMap.of();
|
||||
|
@ -90,7 +92,24 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
}
|
||||
|
||||
@Override public void deleteContent() throws IOException {
|
||||
Directories.deleteFiles(directory());
|
||||
String[] files = directory().listAll();
|
||||
IOException lastException = null;
|
||||
for (String file : files) {
|
||||
if (file.startsWith(CHECKSUMS_PREFIX)) {
|
||||
((StoreDirectory) directory()).deleteFileChecksum(file);
|
||||
} else {
|
||||
try {
|
||||
directory().deleteFile(file);
|
||||
} catch (FileNotFoundException e) {
|
||||
// ignore
|
||||
} catch (IOException e) {
|
||||
lastException = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (lastException != null) {
|
||||
throw lastException;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void fullDelete() throws IOException {
|
||||
|
@ -104,10 +123,10 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
public static Map<String, String> readChecksums(Directory dir) throws IOException {
|
||||
long lastFound = -1;
|
||||
for (String name : dir.listAll()) {
|
||||
if (!name.startsWith("_checksums-")) {
|
||||
if (!name.startsWith(CHECKSUMS_PREFIX)) {
|
||||
continue;
|
||||
}
|
||||
long current = Long.parseLong(name.substring("_checksums-".length()));
|
||||
long current = Long.parseLong(name.substring(CHECKSUMS_PREFIX.length()));
|
||||
if (current > lastFound) {
|
||||
lastFound = current;
|
||||
}
|
||||
|
@ -115,7 +134,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
if (lastFound == -1) {
|
||||
return ImmutableMap.of();
|
||||
}
|
||||
IndexInput indexInput = dir.openInput("_checksums-" + lastFound);
|
||||
IndexInput indexInput = dir.openInput(CHECKSUMS_PREFIX + lastFound);
|
||||
try {
|
||||
indexInput.readInt(); // version
|
||||
return indexInput.readStringStringMap();
|
||||
|
@ -129,7 +148,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
}
|
||||
|
||||
private void writeChecksums(StoreDirectory dir) throws IOException {
|
||||
String checksumName = "_checksums-" + System.currentTimeMillis();
|
||||
String checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
|
||||
ImmutableMap<String, StoreFileMetaData> files = list();
|
||||
synchronized (mutex) {
|
||||
Map<String, String> checksums = new HashMap<String, String>();
|
||||
|
@ -144,9 +163,9 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
output.close();
|
||||
}
|
||||
for (StoreFileMetaData metaData : files.values()) {
|
||||
if (metaData.name().startsWith("_checksums") && !checksumName.equals(metaData.name())) {
|
||||
if (metaData.name().startsWith(CHECKSUMS_PREFIX) && !checksumName.equals(metaData.name())) {
|
||||
try {
|
||||
directory().deleteFile(metaData.name());
|
||||
dir.deleteFileChecksum(metaData.name());
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
|
@ -255,7 +274,19 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
}
|
||||
}
|
||||
|
||||
public void deleteFileChecksum(String name) throws IOException {
|
||||
delegate.deleteFile(name);
|
||||
synchronized (mutex) {
|
||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
|
||||
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void deleteFile(String name) throws IOException {
|
||||
// we don't allow to delete the checksums files, only using the deleteChecksum method
|
||||
if (name.startsWith(CHECKSUMS_PREFIX)) {
|
||||
return;
|
||||
}
|
||||
delegate.deleteFile(name);
|
||||
synchronized (mutex) {
|
||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
|
||||
|
|
Loading…
Reference in New Issue