improve multiple data locations when reading checksums and local gateway state files by going through all the locations to find them.
This commit is contained in:
parent
49d976fc41
commit
f74793c92a
|
@ -73,7 +73,7 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
|
||||||
*/
|
*/
|
||||||
public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements Gateway, ClusterStateListener {
|
public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements Gateway, ClusterStateListener {
|
||||||
|
|
||||||
private File location;
|
private boolean requiresStatePersistence;
|
||||||
|
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
|
|
||||||
|
@ -184,8 +184,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void clusterChanged(final ClusterChangedEvent event) {
|
@Override public void clusterChanged(final ClusterChangedEvent event) {
|
||||||
// the location is set to null, so we should not store it (for example, its not a data/master node)
|
if (!requiresStatePersistence) {
|
||||||
if (location == null) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,20 +259,18 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
||||||
|
|
||||||
// if this is not a possible master node or data node, bail, we won't save anything here...
|
// if this is not a possible master node or data node, bail, we won't save anything here...
|
||||||
if (!clusterService.localNode().masterNode() && !clusterService.localNode().dataNode()) {
|
if (!clusterService.localNode().masterNode() && !clusterService.localNode().dataNode()) {
|
||||||
location = null;
|
requiresStatePersistence = false;
|
||||||
} else {
|
} else {
|
||||||
// create the location where the state will be stored
|
// create the location where the state will be stored
|
||||||
// TODO: we might want to persist states on all data locations
|
// TODO: we might want to persist states on all data locations
|
||||||
this.location = new File(nodeEnv.nodeDataLocations()[0], "_state");
|
requiresStatePersistence = true;
|
||||||
FileSystemUtils.mkdirs(this.location);
|
|
||||||
|
|
||||||
if (clusterService.localNode().masterNode()) {
|
if (clusterService.localNode().masterNode()) {
|
||||||
try {
|
try {
|
||||||
long version = findLatestMetaStateVersion();
|
File latest = findLatestMetaStateVersion();
|
||||||
if (version != -1) {
|
if (latest != null) {
|
||||||
File file = new File(location, "metadata-" + version);
|
logger.debug("[find_latest_state]: loading metadata from [{}]", latest.getAbsolutePath());
|
||||||
logger.debug("[find_latest_state]: loading metadata from [{}]", file.getAbsolutePath());
|
this.currentMetaState = readMetaState(Streams.copyToByteArray(new FileInputStream(latest)));
|
||||||
this.currentMetaState = readMetaState(Streams.copyToByteArray(new FileInputStream(file)));
|
|
||||||
} else {
|
} else {
|
||||||
logger.debug("[find_latest_state]: no metadata state loaded");
|
logger.debug("[find_latest_state]: no metadata state loaded");
|
||||||
}
|
}
|
||||||
|
@ -284,11 +281,10 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
||||||
|
|
||||||
if (clusterService.localNode().dataNode()) {
|
if (clusterService.localNode().dataNode()) {
|
||||||
try {
|
try {
|
||||||
long version = findLatestStartedShardsVersion();
|
File latest = findLatestStartedShardsVersion();
|
||||||
if (version != -1) {
|
if (latest != null) {
|
||||||
File file = new File(location, "shards-" + version);
|
logger.debug("[find_latest_state]: loading started shards from [{}]", latest.getAbsolutePath());
|
||||||
logger.debug("[find_latest_state]: loading started shards from [{}]", file.getAbsolutePath());
|
this.currentStartedShards = readStartedShards(Streams.copyToByteArray(new FileInputStream(latest)));
|
||||||
this.currentStartedShards = readStartedShards(Streams.copyToByteArray(new FileInputStream(file)));
|
|
||||||
} else {
|
} else {
|
||||||
logger.debug("[find_latest_state]: no started shards loaded");
|
logger.debug("[find_latest_state]: no started shards loaded");
|
||||||
}
|
}
|
||||||
|
@ -299,9 +295,19 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private long findLatestStartedShardsVersion() throws IOException {
|
private File findLatestStartedShardsVersion() throws IOException {
|
||||||
long index = -1;
|
long index = -1;
|
||||||
for (File stateFile : location.listFiles()) {
|
File latest = null;
|
||||||
|
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||||
|
File stateLocation = new File(dataLocation, "_state");
|
||||||
|
if (!stateLocation.exists()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
File[] stateFiles = stateLocation.listFiles();
|
||||||
|
if (stateFiles == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (File stateFile : stateFiles) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]");
|
logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]");
|
||||||
}
|
}
|
||||||
|
@ -319,18 +325,29 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
||||||
}
|
}
|
||||||
readStartedShards(data);
|
readStartedShards(data);
|
||||||
index = fileIndex;
|
index = fileIndex;
|
||||||
|
latest = stateFile;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn("[find_latest_state]: failed to read state from [" + name + "], ignoring...", e);
|
logger.warn("[find_latest_state]: failed to read state from [" + name + "], ignoring...", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return index;
|
return latest;
|
||||||
}
|
}
|
||||||
|
|
||||||
private long findLatestMetaStateVersion() throws IOException {
|
private File findLatestMetaStateVersion() throws IOException {
|
||||||
long index = -1;
|
long index = -1;
|
||||||
for (File stateFile : location.listFiles()) {
|
File latest = null;
|
||||||
|
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||||
|
File stateLocation = new File(dataLocation, "_state");
|
||||||
|
if (!stateLocation.exists()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
File[] stateFiles = stateLocation.listFiles();
|
||||||
|
if (stateFiles == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (File stateFile : stateFiles) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]");
|
logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]");
|
||||||
}
|
}
|
||||||
|
@ -349,13 +366,14 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
||||||
}
|
}
|
||||||
readMetaState(data);
|
readMetaState(data);
|
||||||
index = fileIndex;
|
index = fileIndex;
|
||||||
|
latest = stateFile;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn("[find_latest_state]: failed to read state from [" + name + "], ignoring...", e);
|
logger.warn("[find_latest_state]: failed to read state from [" + name + "], ignoring...", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return index;
|
return latest;
|
||||||
}
|
}
|
||||||
|
|
||||||
private LocalGatewayMetaState readMetaState(byte[] data) throws IOException {
|
private LocalGatewayMetaState readMetaState(byte[] data) throws IOException {
|
||||||
|
@ -411,7 +429,11 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
||||||
builder.metaData(event.state().metaData());
|
builder.metaData(event.state().metaData());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
File stateFile = new File(location, "metadata-" + version);
|
File stateLocation = new File(nodeEnv.nodeDataLocations()[0], "_state");
|
||||||
|
if (!stateLocation.exists()) {
|
||||||
|
FileSystemUtils.mkdirs(stateLocation);
|
||||||
|
}
|
||||||
|
File stateFile = new File(stateLocation, "metadata-" + version);
|
||||||
OutputStream fos = new FileOutputStream(stateFile);
|
OutputStream fos = new FileOutputStream(stateFile);
|
||||||
if (compress) {
|
if (compress) {
|
||||||
fos = new LZFOutputStream(fos);
|
fos = new LZFOutputStream(fos);
|
||||||
|
@ -432,7 +454,12 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
||||||
currentMetaState = stateToWrite;
|
currentMetaState = stateToWrite;
|
||||||
|
|
||||||
// delete all the other files
|
// delete all the other files
|
||||||
File[] files = location.listFiles(new FilenameFilter() {
|
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||||
|
stateLocation = new File(dataLocation, "_state");
|
||||||
|
if (!stateLocation.exists()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
File[] files = stateLocation.listFiles(new FilenameFilter() {
|
||||||
@Override public boolean accept(File dir, String name) {
|
@Override public boolean accept(File dir, String name) {
|
||||||
return name.startsWith("metadata-") && !name.equals("metadata-" + version);
|
return name.startsWith("metadata-") && !name.equals("metadata-" + version);
|
||||||
}
|
}
|
||||||
|
@ -442,6 +469,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
||||||
file.delete();
|
file.delete();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn("failed to write updated state", e);
|
logger.warn("failed to write updated state", e);
|
||||||
|
@ -461,7 +489,11 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
||||||
|
|
||||||
@Override public void run() {
|
@Override public void run() {
|
||||||
try {
|
try {
|
||||||
File stateFile = new File(location, "shards-" + event.state().version());
|
File stateLocation = new File(nodeEnv.nodeDataLocations()[0], "_state");
|
||||||
|
if (!stateLocation.exists()) {
|
||||||
|
FileSystemUtils.mkdirs(stateLocation);
|
||||||
|
}
|
||||||
|
File stateFile = new File(stateLocation, "shards-" + event.state().version());
|
||||||
OutputStream fos = new FileOutputStream(stateFile);
|
OutputStream fos = new FileOutputStream(stateFile);
|
||||||
if (compress) {
|
if (compress) {
|
||||||
fos = new LZFOutputStream(fos);
|
fos = new LZFOutputStream(fos);
|
||||||
|
@ -487,7 +519,12 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
||||||
}
|
}
|
||||||
|
|
||||||
// delete all the other files
|
// delete all the other files
|
||||||
File[] files = location.listFiles(new FilenameFilter() {
|
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||||
|
File stateLocation = new File(dataLocation, "_state");
|
||||||
|
if (!stateLocation.exists()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
File[] files = stateLocation.listFiles(new FilenameFilter() {
|
||||||
@Override public boolean accept(File dir, String name) {
|
@Override public boolean accept(File dir, String name) {
|
||||||
return name.startsWith("shards-") && !name.equals("shards-" + event.state().version());
|
return name.startsWith("shards-") && !name.equals("shards-" + event.state().version());
|
||||||
}
|
}
|
||||||
|
@ -499,4 +536,5 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,8 +25,8 @@ import org.apache.lucene.store.IndexInput;
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
import org.apache.lucene.store.Lock;
|
import org.apache.lucene.store.Lock;
|
||||||
import org.apache.lucene.store.LockFactory;
|
import org.apache.lucene.store.LockFactory;
|
||||||
|
import org.apache.lucene.store.SimpleFSDirectory;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.Unicode;
|
|
||||||
import org.elasticsearch.common.collect.ImmutableList;
|
import org.elasticsearch.common.collect.ImmutableList;
|
||||||
import org.elasticsearch.common.collect.ImmutableMap;
|
import org.elasticsearch.common.collect.ImmutableMap;
|
||||||
import org.elasticsearch.common.collect.MapBuilder;
|
import org.elasticsearch.common.collect.MapBuilder;
|
||||||
|
@ -164,22 +164,29 @@ public class Store extends AbstractIndexShardComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Map<String, String> readChecksums(File[] locations) throws IOException {
|
public static Map<String, String> readChecksums(File[] locations) throws IOException {
|
||||||
for (File location : locations) {
|
Directory[] dirs = new Directory[locations.length];
|
||||||
FSDirectory directory = FSDirectory.open(location);
|
|
||||||
try {
|
try {
|
||||||
Map<String, String> checksums = readChecksums(directory, null);
|
for (int i = 0; i < locations.length; i++) {
|
||||||
if (checksums != null) {
|
dirs[i] = new SimpleFSDirectory(locations[i]);
|
||||||
return checksums;
|
|
||||||
}
|
}
|
||||||
|
return readChecksums(dirs, null);
|
||||||
} finally {
|
} finally {
|
||||||
directory.close();
|
for (Directory dir : dirs) {
|
||||||
|
if (dir != null) {
|
||||||
|
try {
|
||||||
|
dir.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static Map<String, String> readChecksums(Directory dir, Map<String, String> defaultValue) throws IOException {
|
static Map<String, String> readChecksums(Directory[] dirs, Map<String, String> defaultValue) throws IOException {
|
||||||
long lastFound = -1;
|
long lastFound = -1;
|
||||||
|
Directory lastDir = null;
|
||||||
|
for (Directory dir : dirs) {
|
||||||
for (String name : dir.listAll()) {
|
for (String name : dir.listAll()) {
|
||||||
if (!isChecksum(name)) {
|
if (!isChecksum(name)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -187,12 +194,14 @@ public class Store extends AbstractIndexShardComponent {
|
||||||
long current = Long.parseLong(name.substring(CHECKSUMS_PREFIX.length()));
|
long current = Long.parseLong(name.substring(CHECKSUMS_PREFIX.length()));
|
||||||
if (current > lastFound) {
|
if (current > lastFound) {
|
||||||
lastFound = current;
|
lastFound = current;
|
||||||
|
lastDir = dir;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (lastFound == -1) {
|
if (lastFound == -1) {
|
||||||
return defaultValue;
|
return defaultValue;
|
||||||
}
|
}
|
||||||
IndexInput indexInput = dir.openInput(CHECKSUMS_PREFIX + lastFound);
|
IndexInput indexInput = lastDir.openInput(CHECKSUMS_PREFIX + lastFound);
|
||||||
try {
|
try {
|
||||||
indexInput.readInt(); // version
|
indexInput.readInt(); // version
|
||||||
return indexInput.readStringStringMap();
|
return indexInput.readStringStringMap();
|
||||||
|
@ -205,10 +214,6 @@ public class Store extends AbstractIndexShardComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void writeChecksums() throws IOException {
|
public void writeChecksums() throws IOException {
|
||||||
writeChecksums(directory);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void writeChecksums(StoreDirectory dir) throws IOException {
|
|
||||||
String checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
|
String checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
|
||||||
ImmutableMap<String, StoreFileMetaData> files = list();
|
ImmutableMap<String, StoreFileMetaData> files = list();
|
||||||
synchronized (mutex) {
|
synchronized (mutex) {
|
||||||
|
@ -218,7 +223,7 @@ public class Store extends AbstractIndexShardComponent {
|
||||||
checksums.put(metaData.name(), metaData.checksum());
|
checksums.put(metaData.name(), metaData.checksum());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
IndexOutput output = dir.createOutput(checksumName, false);
|
IndexOutput output = directory.createOutput(checksumName, false);
|
||||||
output.writeInt(0); // version
|
output.writeInt(0); // version
|
||||||
output.writeStringStringMap(checksums);
|
output.writeStringStringMap(checksums);
|
||||||
output.close();
|
output.close();
|
||||||
|
@ -226,7 +231,7 @@ public class Store extends AbstractIndexShardComponent {
|
||||||
for (StoreFileMetaData metaData : files.values()) {
|
for (StoreFileMetaData metaData : files.values()) {
|
||||||
if (metaData.name().startsWith(CHECKSUMS_PREFIX) && !checksumName.equals(metaData.name())) {
|
if (metaData.name().startsWith(CHECKSUMS_PREFIX) && !checksumName.equals(metaData.name())) {
|
||||||
try {
|
try {
|
||||||
dir.deleteFileChecksum(metaData.name());
|
directory.deleteFileChecksum(metaData.name());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
@ -282,30 +287,10 @@ public class Store extends AbstractIndexShardComponent {
|
||||||
this.delegates = delegates;
|
this.delegates = delegates;
|
||||||
synchronized (mutex) {
|
synchronized (mutex) {
|
||||||
MapBuilder<String, StoreFileMetaData> builder = MapBuilder.newMapBuilder();
|
MapBuilder<String, StoreFileMetaData> builder = MapBuilder.newMapBuilder();
|
||||||
Map<String, String> checksums = readChecksums(delegates[0], new HashMap<String, String>());
|
Map<String, String> checksums = readChecksums(delegates, new HashMap<String, String>());
|
||||||
for (Directory delegate : delegates) {
|
for (Directory delegate : delegates) {
|
||||||
for (String file : delegate.listAll()) {
|
for (String file : delegate.listAll()) {
|
||||||
// BACKWARD CKS SUPPORT
|
|
||||||
if (file.endsWith(".cks")) { // ignore checksum files here
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
String checksum = checksums.get(file);
|
String checksum = checksums.get(file);
|
||||||
|
|
||||||
// BACKWARD CKS SUPPORT
|
|
||||||
if (checksum == null) {
|
|
||||||
if (delegate.fileExists(file + ".cks")) {
|
|
||||||
IndexInput indexInput = delegate.openInput(file + ".cks");
|
|
||||||
try {
|
|
||||||
if (indexInput.length() > 0) {
|
|
||||||
byte[] checksumBytes = new byte[(int) indexInput.length()];
|
|
||||||
indexInput.readBytes(checksumBytes, 0, checksumBytes.length, false);
|
|
||||||
checksum = Unicode.fromBytes(checksumBytes);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
indexInput.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum, delegate));
|
builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum, delegate));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -350,13 +335,16 @@ public class Store extends AbstractIndexShardComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void deleteFileChecksum(String name) throws IOException {
|
public void deleteFileChecksum(String name) throws IOException {
|
||||||
|
StoreFileMetaData metaData = filesMetadata.get(name);
|
||||||
|
if (metaData != null) {
|
||||||
try {
|
try {
|
||||||
delegates[0].deleteFile(name);
|
metaData.directory().deleteFile(name);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
if (delegates[0].fileExists(name)) {
|
if (metaData.directory().fileExists(name)) {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
synchronized (mutex) {
|
synchronized (mutex) {
|
||||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
|
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
|
||||||
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
|
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
|
||||||
|
|
Loading…
Reference in New Issue