compress metadata stored in gateway

This commit is contained in:
kimchy 2011-03-24 00:38:16 +02:00
parent b8ac25c430
commit 36edcef640
8 changed files with 75 additions and 14 deletions

View File

@ -76,6 +76,10 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
protected abstract String transportNodeAction(); protected abstract String transportNodeAction();
protected boolean transportCompress() {
return false;
}
protected abstract String executor(); protected abstract String executor();
protected abstract Request newRequest(); protected abstract Request newRequest();
@ -135,6 +139,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
if (request.timeout() != null) { if (request.timeout() != null) {
transportRequestOptions.withTimeout(request.timeout()); transportRequestOptions.withTimeout(request.timeout());
} }
transportRequestOptions.withCompress(transportCompress());
for (final String nodeId : nodesIds) { for (final String nodeId : nodesIds) {
final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId); final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId);
if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) { if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) {
@ -220,8 +225,9 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
request.listenerThreaded(false); request.listenerThreaded(false);
execute(request, new ActionListener<Response>() { execute(request, new ActionListener<Response>() {
@Override public void onResponse(Response response) { @Override public void onResponse(Response response) {
TransportResponseOptions options = TransportResponseOptions.options().withCompress(transportCompress());
try { try {
channel.sendResponse(response); channel.sendResponse(response, options);
} catch (Exception e) { } catch (Exception e) {
onFailure(e); onFailure(e);
} }

View File

@ -26,6 +26,11 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.blobstore.*;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.compress.lzf.LZFEncoder;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.LZFStreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.*; import org.elasticsearch.common.xcontent.*;
@ -53,6 +58,8 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
private ImmutableBlobContainer metaDataBlobContainer; private ImmutableBlobContainer metaDataBlobContainer;
private boolean compress;
private volatile int currentIndex; private volatile int currentIndex;
protected BlobStoreGateway(Settings settings, ThreadPool threadPool, ClusterService clusterService) { protected BlobStoreGateway(Settings settings, ThreadPool threadPool, ClusterService clusterService) {
@ -65,6 +72,7 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
this.basePath = BlobPath.cleanPath().add(clusterName.value()); this.basePath = BlobPath.cleanPath().add(clusterName.value());
this.metaDataBlobContainer = blobStore.immutableBlobContainer(basePath.add("metadata")); this.metaDataBlobContainer = blobStore.immutableBlobContainer(basePath.add("metadata"));
this.currentIndex = findLatestIndex(); this.currentIndex = findLatestIndex();
this.compress = componentSettings.getAsBoolean("compress", true);
logger.debug("Latest metadata found at index [" + currentIndex + "]"); logger.debug("Latest metadata found at index [" + currentIndex + "]");
} }
@ -137,7 +145,6 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
XContentBuilder builder; XContentBuilder builder;
try { try {
builder = XContentFactory.contentBuilder(XContentType.JSON); builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.prettyPrint();
builder.startObject(); builder.startObject();
MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS); MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject(); builder.endObject();
@ -146,7 +153,15 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
} }
try { try {
metaDataBlobContainer.writeBlob(newMetaData, new ByteArrayInputStream(builder.unsafeBytes(), 0, builder.unsafeBytesLength()), builder.unsafeBytesLength()); byte[] data = builder.unsafeBytes();
int size = builder.unsafeBytesLength();
if (compress) {
data = LZFEncoder.encode(data, size);
size = data.length;
}
metaDataBlobContainer.writeBlob(newMetaData, new ByteArrayInputStream(data, 0, size), size);
} catch (IOException e) { } catch (IOException e) {
throw new GatewayException("Failed to write metadata [" + newMetaData + "]", e); throw new GatewayException("Failed to write metadata [" + newMetaData + "]", e);
} }
@ -191,7 +206,13 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
private MetaData readMetaData(byte[] data) throws IOException { private MetaData readMetaData(byte[] data) throws IOException {
XContentParser parser = null; XContentParser parser = null;
try { try {
parser = XContentFactory.xContent(XContentType.JSON).createParser(data); if (LZFDecoder.isCompressed(data)) {
BytesStreamInput siBytes = new BytesStreamInput(data);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
} else {
parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
}
return MetaData.Builder.fromXContent(parser); return MetaData.Builder.fromXContent(parser);
} finally { } finally {
if (parser != null) { if (parser != null) {

View File

@ -31,10 +31,15 @@ import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.compress.lzf.LZFOutputStream;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.LZFStreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*; import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
@ -65,6 +70,9 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
private final TransportNodesListGatewayStartedShards listGatewayStartedShards; private final TransportNodesListGatewayStartedShards listGatewayStartedShards;
private final boolean compress;
private volatile LocalGatewayMetaState currentMetaState; private volatile LocalGatewayMetaState currentMetaState;
private volatile LocalGatewayStartedShards currentStartedShards; private volatile LocalGatewayStartedShards currentStartedShards;
@ -80,6 +88,8 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
this.listGatewayMetaState = listGatewayMetaState.initGateway(this); this.listGatewayMetaState = listGatewayMetaState.initGateway(this);
this.listGatewayStartedShards = listGatewayStartedShards.initGateway(this); this.listGatewayStartedShards = listGatewayStartedShards.initGateway(this);
this.compress = componentSettings.getAsBoolean("compress", true);
} }
@Override public String type() { @Override public String type() {
@ -181,13 +191,15 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
try { try {
LocalGatewayMetaState stateToWrite = builder.build(); LocalGatewayMetaState stateToWrite = builder.build();
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON);
xContentBuilder.prettyPrint();
xContentBuilder.startObject(); xContentBuilder.startObject();
LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject(); xContentBuilder.endObject();
File stateFile = new File(location, "metadata-" + event.state().version()); File stateFile = new File(location, "metadata-" + event.state().version());
FileOutputStream fos = new FileOutputStream(stateFile); OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength()); fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength());
fos.close(); fos.close();
@ -246,13 +258,15 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
try { try {
LocalGatewayStartedShards stateToWrite = builder.build(); LocalGatewayStartedShards stateToWrite = builder.build();
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON);
xContentBuilder.prettyPrint();
xContentBuilder.startObject(); xContentBuilder.startObject();
LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject(); xContentBuilder.endObject();
File stateFile = new File(location, "shards-" + event.state().version()); File stateFile = new File(location, "shards-" + event.state().version());
FileOutputStream fos = new FileOutputStream(stateFile); OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength()); fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength());
fos.close(); fos.close();
@ -376,7 +390,13 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
private LocalGatewayMetaState readMetaState(byte[] data) throws IOException { private LocalGatewayMetaState readMetaState(byte[] data) throws IOException {
XContentParser parser = null; XContentParser parser = null;
try { try {
parser = XContentFactory.xContent(XContentType.JSON).createParser(data); if (LZFDecoder.isCompressed(data)) {
BytesStreamInput siBytes = new BytesStreamInput(data);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
} else {
parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
}
return LocalGatewayMetaState.Builder.fromXContent(parser); return LocalGatewayMetaState.Builder.fromXContent(parser);
} finally { } finally {
if (parser != null) { if (parser != null) {
@ -388,7 +408,13 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
private LocalGatewayStartedShards readStartedShards(byte[] data) throws IOException { private LocalGatewayStartedShards readStartedShards(byte[] data) throws IOException {
XContentParser parser = null; XContentParser parser = null;
try { try {
parser = XContentFactory.xContent(XContentType.JSON).createParser(data); if (LZFDecoder.isCompressed(data)) {
BytesStreamInput siBytes = new BytesStreamInput(data);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
} else {
parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
}
return LocalGatewayStartedShards.Builder.fromXContent(parser); return LocalGatewayStartedShards.Builder.fromXContent(parser);
} finally { } finally {
if (parser != null) { if (parser != null) {

View File

@ -73,6 +73,10 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA
return "/gateway/local/meta-state/node"; return "/gateway/local/meta-state/node";
} }
@Override protected boolean transportCompress() {
return true; // compress since the metadata can become large
}
@Override protected Request newRequest() { @Override protected Request newRequest() {
return new Request(); return new Request();
} }

View File

@ -73,6 +73,10 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
return "/gateway/local/started-shards/node"; return "/gateway/local/started-shards/node";
} }
@Override protected boolean transportCompress() {
return true; // this can become big...
}
@Override protected Request newRequest() { @Override protected Request newRequest() {
return new Request(); return new Request();
} }

View File

@ -32,8 +32,8 @@ public class TransportResponseOptions {
private boolean compress; private boolean compress;
public TransportResponseOptions withCompress() { public TransportResponseOptions withCompress(boolean compress) {
this.compress = true; this.compress = compress;
return this; return this;
} }

View File

@ -67,7 +67,7 @@ public class NettyTransportChannel implements TransportChannel {
@Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException { @Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException {
if (transport.compress) { if (transport.compress) {
options.withCompress(); options.withCompress(true);
} }
byte[] data = TransportStreams.buildResponse(requestId, message, options); byte[] data = TransportStreams.buildResponse(requestId, message, options);
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data); ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);

View File

@ -130,7 +130,7 @@ public abstract class AbstractSimpleTransportTests {
@Override public void messageReceived(StringMessage request, TransportChannel channel) { @Override public void messageReceived(StringMessage request, TransportChannel channel) {
assertThat("moshe", equalTo(request.message)); assertThat("moshe", equalTo(request.message));
try { try {
channel.sendResponse(new StringMessage("hello " + request.message), TransportResponseOptions.options().withCompress()); channel.sendResponse(new StringMessage("hello " + request.message), TransportResponseOptions.options().withCompress(true));
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
assertThat(e.getMessage(), false, equalTo(true)); assertThat(e.getMessage(), false, equalTo(true));