optimize writing gateway data and reuse streams (if needed)

This commit is contained in:
kimchy 2011-05-24 15:31:50 +03:00
parent 68a56a0b58
commit 4711be7061
3 changed files with 57 additions and 38 deletions

View File

@ -106,7 +106,8 @@ public class LZFStreamOutput extends StreamOutput {
public void close() throws IOException { public void close() throws IOException {
flush(); flush();
if (neverClose) { if (neverClose) {
reset(); // just reset here the LZF stream (not the underlying stream, since we might want to read from it)
_position = 0;
return; return;
} }
_outputStream.close(); _outputStream.close();

View File

@ -23,17 +23,26 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.compress.lzf.LZF; import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFEncoder;
import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamInput; import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.LZFStreamInput; import org.elasticsearch.common.io.stream.LZFStreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.*; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayException; import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.gateway.shared.SharedStorageGateway; import org.elasticsearch.gateway.shared.SharedStorageGateway;
import org.elasticsearch.index.gateway.CommitPoint; import org.elasticsearch.index.gateway.CommitPoint;
@ -142,28 +151,24 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
@Override public void write(MetaData metaData) throws GatewayException { @Override public void write(MetaData metaData) throws GatewayException {
final String newMetaData = "metadata-" + (currentIndex + 1); final String newMetaData = "metadata-" + (currentIndex + 1);
XContentBuilder builder; CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try { try {
builder = XContentFactory.contentBuilder(XContentType.JSON); StreamOutput out;
if (compress) {
out = cachedEntry.cachedLZFBytes();
} else {
out = cachedEntry.cachedBytes();
}
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, out);
builder.startObject(); builder.startObject();
MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS); MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject(); builder.endObject();
} catch (IOException e) { builder.close();
throw new GatewayException("Failed to serialize metadata into gateway", e); metaDataBlobContainer.writeBlob(newMetaData, new ByteArrayInputStream(cachedEntry.bytes().unsafeByteArray(), 0, cachedEntry.bytes().size()), cachedEntry.bytes().size());
}
try {
byte[] data = builder.unsafeBytes();
int size = builder.unsafeBytesLength();
if (compress) {
data = LZFEncoder.encode(data, size);
size = data.length;
}
metaDataBlobContainer.writeBlob(newMetaData, new ByteArrayInputStream(data, 0, size), size);
} catch (IOException e) { } catch (IOException e) {
throw new GatewayException("Failed to write metadata [" + newMetaData + "]", e); throw new GatewayException("Failed to write metadata [" + newMetaData + "]", e);
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
} }
currentIndex++; currentIndex++;

View File

@ -25,7 +25,11 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.lzf.LZF; import org.elasticsearch.common.compress.lzf.LZF;
@ -38,14 +42,23 @@ import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamInput; import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.LZFStreamInput; import org.elasticsearch.common.io.stream.LZFStreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException; import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule; import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import java.io.*; import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -189,21 +202,20 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
builder.metaData(event.state().metaData()); builder.metaData(event.state().metaData());
try { try {
File stateFile = new File(location, "metadata-" + event.state().version());
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
LocalGatewayMetaState stateToWrite = builder.build(); LocalGatewayMetaState stateToWrite = builder.build();
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos);
if (prettyPrint) { if (prettyPrint) {
xContentBuilder.prettyPrint(); xContentBuilder.prettyPrint();
} }
xContentBuilder.startObject(); xContentBuilder.startObject();
LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject(); xContentBuilder.endObject();
xContentBuilder.close();
File stateFile = new File(location, "metadata-" + event.state().version());
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength());
fos.close(); fos.close();
FileSystemUtils.syncFile(stateFile); FileSystemUtils.syncFile(stateFile);
@ -265,21 +277,22 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
} }
try { try {
File stateFile = new File(location, "shards-" + event.state().version());
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
LocalGatewayStartedShards stateToWrite = builder.build(); LocalGatewayStartedShards stateToWrite = builder.build();
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos);
if (prettyPrint) { if (prettyPrint) {
xContentBuilder.prettyPrint(); xContentBuilder.prettyPrint();
} }
xContentBuilder.startObject(); xContentBuilder.startObject();
LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject(); xContentBuilder.endObject();
xContentBuilder.close();
File stateFile = new File(location, "shards-" + event.state().version());
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength());
fos.close(); fos.close();
FileSystemUtils.syncFile(stateFile); FileSystemUtils.syncFile(stateFile);