better logging in case of state persistence failure

This commit is contained in:
Shay Banon 2011-08-04 12:09:21 +03:00
parent 1e6dbc5ff3
commit cbb95dee17
2 changed files with 152 additions and 85 deletions

View File

@ -0,0 +1,44 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.thread;
import org.elasticsearch.common.logging.ESLogger;
/**
*/
public class LoggingRunnable implements Runnable {
private final Runnable runnable;
private final ESLogger logger;
public LoggingRunnable(ESLogger logger, Runnable runnable) {
this.runnable = runnable;
this.logger = logger;
}
@Override public void run() {
try {
runnable.run();
} catch (Exception e) {
logger.warn("failed to execute [{}]", e, runnable.toString());
}
}
}

View File

@ -43,6 +43,7 @@ import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.LZFStreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.thread.LoggingRunnable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -193,52 +194,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
// we only write the local metadata if this is a possible master node
if (event.state().nodes().localNode().masterNode() && event.metaDataChanged()) {
executor.execute(new Runnable() {
@Override public void run() {
LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder();
if (currentMetaState != null) {
builder.state(currentMetaState);
}
final long version = event.state().metaData().version();
builder.version(version);
builder.metaData(event.state().metaData());
try {
File stateFile = new File(location, "metadata-" + version);
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
LocalGatewayMetaState stateToWrite = builder.build();
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();
fos.close();
FileSystemUtils.syncFile(stateFile);
currentMetaState = stateToWrite;
// delete all the other files
File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("metadata-") && !name.equals("metadata-" + version);
}
});
for (File file : files) {
file.delete();
}
} catch (IOException e) {
logger.warn("failed to write updated state", e);
}
}
});
executor.execute(new LoggingRunnable(logger, new PersistMetaData(event)));
}
if (event.state().nodes().localNode().dataNode() && event.routingTableChanged()) {
@ -282,45 +238,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
// only write if something changed...
if (changed) {
final LocalGatewayStartedShards stateToWrite = builder.build();
executor.execute(new Runnable() {
@Override public void run() {
try {
File stateFile = new File(location, "shards-" + event.state().version());
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();
fos.close();
FileSystemUtils.syncFile(stateFile);
currentStartedShards = stateToWrite;
} catch (IOException e) {
logger.warn("failed to write updated state", e);
return;
}
// delete all the other files
File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("shards-") && !name.equals("shards-" + event.state().version());
}
});
for (File file : files) {
file.delete();
}
}
});
executor.execute(new LoggingRunnable(logger, new PersistShards(event, stateToWrite)));
}
}
}
@ -472,4 +390,109 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
}
}
}
class PersistMetaData implements Runnable {
private final ClusterChangedEvent event;
public PersistMetaData(ClusterChangedEvent event) {
this.event = event;
}
@Override public void run() {
LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder();
if (currentMetaState != null) {
builder.state(currentMetaState);
}
final long version = event.state().metaData().version();
builder.version(version);
builder.metaData(event.state().metaData());
try {
File stateFile = new File(location, "metadata-" + version);
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
LocalGatewayMetaState stateToWrite = builder.build();
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();
fos.close();
FileSystemUtils.syncFile(stateFile);
currentMetaState = stateToWrite;
// delete all the other files
File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("metadata-") && !name.equals("metadata-" + version);
}
});
if (files != null) {
for (File file : files) {
file.delete();
}
}
} catch (IOException e) {
logger.warn("failed to write updated state", e);
}
}
}
class PersistShards implements Runnable {
private final ClusterChangedEvent event;
private final LocalGatewayStartedShards stateToWrite;
public PersistShards(ClusterChangedEvent event, LocalGatewayStartedShards stateToWrite) {
this.event = event;
this.stateToWrite = stateToWrite;
}
@Override public void run() {
try {
File stateFile = new File(location, "shards-" + event.state().version());
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();
fos.close();
FileSystemUtils.syncFile(stateFile);
currentStartedShards = stateToWrite;
} catch (IOException e) {
logger.warn("failed to write updated state", e);
return;
}
// delete all the other files
File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("shards-") && !name.equals("shards-" + event.state().version());
}
});
if (files != null) {
for (File file : files) {
file.delete();
}
}
}
}
}