diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java index 6167ac50b38..a4cf47761c4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java @@ -106,7 +106,8 @@ public class LZFStreamOutput extends StreamOutput { public void close() throws IOException { flush(); if (neverClose) { - reset(); + // just reset here the LZF stream (not the underlying stream, since we might want to read from it) + _position = 0; return; } _outputStream.close(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java index 256023c86ce..aa107453c06 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java @@ -23,17 +23,26 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.blobstore.*; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.ImmutableBlobContainer; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.compress.lzf.LZF; -import org.elasticsearch.common.compress.lzf.LZFEncoder; import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.CachedStreamInput; +import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.LZFStreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.gateway.GatewayException; import org.elasticsearch.gateway.shared.SharedStorageGateway; import org.elasticsearch.index.gateway.CommitPoint; @@ -142,28 +151,24 @@ public abstract class BlobStoreGateway extends SharedStorageGateway { @Override public void write(MetaData metaData) throws GatewayException { final String newMetaData = "metadata-" + (currentIndex + 1); - XContentBuilder builder; + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { - builder = XContentFactory.contentBuilder(XContentType.JSON); + StreamOutput out; + if (compress) { + out = cachedEntry.cachedLZFBytes(); + } else { + out = cachedEntry.cachedBytes(); + } + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, out); builder.startObject(); MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS); builder.endObject(); - } catch (IOException e) { - throw new GatewayException("Failed to serialize metadata into gateway", e); - } - - try { - byte[] data = builder.unsafeBytes(); - int size = builder.unsafeBytesLength(); - - if (compress) { - data = LZFEncoder.encode(data, size); - size = data.length; - } - - metaDataBlobContainer.writeBlob(newMetaData, new ByteArrayInputStream(data, 0, size), size); + builder.close(); + metaDataBlobContainer.writeBlob(newMetaData, new ByteArrayInputStream(cachedEntry.bytes().unsafeByteArray(), 0, cachedEntry.bytes().size()), cachedEntry.bytes().size()); } catch (IOException e) { throw new GatewayException("Failed to write metadata [" + newMetaData + "]", e); + } finally { + CachedStreamOutput.pushEntry(cachedEntry); } currentIndex++; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 2621a70b5f4..83a24d0e2ed 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -25,7 +25,11 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.lzf.LZF; @@ -38,14 +42,23 @@ import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.CachedStreamInput; import org.elasticsearch.common.io.stream.LZFStreamInput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.GatewayException; import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule; import org.elasticsearch.index.shard.ShardId; -import java.io.*; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.OutputStream; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -189,21 +202,20 @@ public class LocalGateway extends AbstractLifecycleComponent implements builder.metaData(event.state().metaData()); try { + File stateFile = new File(location, "metadata-" + event.state().version()); + OutputStream fos = new FileOutputStream(stateFile); + if (compress) { + fos = new LZFOutputStream(fos); + } LocalGatewayMetaState stateToWrite = builder.build(); - XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); + XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos); if (prettyPrint) { xContentBuilder.prettyPrint(); } xContentBuilder.startObject(); LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); xContentBuilder.endObject(); - - File stateFile = new File(location, "metadata-" + event.state().version()); - OutputStream fos = new FileOutputStream(stateFile); - if (compress) { - fos = new LZFOutputStream(fos); - } - fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength()); + xContentBuilder.close(); fos.close(); FileSystemUtils.syncFile(stateFile); @@ -265,21 +277,22 @@ public class LocalGateway extends AbstractLifecycleComponent implements } try { + File stateFile = new File(location, "shards-" + event.state().version()); + OutputStream fos = new FileOutputStream(stateFile); + if (compress) { + fos = new LZFOutputStream(fos); + } + LocalGatewayStartedShards stateToWrite = builder.build(); - XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); + XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos); if (prettyPrint) { xContentBuilder.prettyPrint(); } xContentBuilder.startObject(); LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); xContentBuilder.endObject(); + xContentBuilder.close(); - File stateFile = new File(location, "shards-" + event.state().version()); - OutputStream fos = new FileOutputStream(stateFile); - if (compress) { - fos = new LZFOutputStream(fos); - } - fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength()); fos.close(); FileSystemUtils.syncFile(stateFile);