From 0072dd816b928c4f3b23a210bd959819509ed512 Mon Sep 17 00:00:00 2001 From: kimchy Date: Fri, 23 Jul 2010 23:10:59 +0300 Subject: [PATCH] nicer logging, and use sync streams --- .../support/ImmutableAppendableBlobContainer.java | 5 +++-- .../java/org/elasticsearch/gateway/GatewayService.java | 7 +++++-- .../index/gateway/IndexShardGatewayService.java | 3 ++- .../gateway/blobstore/BlobStoreIndexShardGateway.java | 4 ++-- .../cloud/blobstore/CloudImmutableBlobContainer.java | 2 +- 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/ImmutableAppendableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/ImmutableAppendableBlobContainer.java index e540b8c3318..55111e84401 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/ImmutableAppendableBlobContainer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/ImmutableAppendableBlobContainer.java @@ -24,9 +24,9 @@ import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.ImmutableBlobContainer; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.Sets; -import org.elasticsearch.common.io.FastByteArrayInputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.Set; @@ -57,7 +57,8 @@ public class ImmutableAppendableBlobContainer extends AbstractBlobContainer impl listener.onFailure(e); return; } - FastByteArrayInputStream is = new FastByteArrayInputStream(out.unsafeByteArray(), 0, out.size()); + // use teh sync one + ByteArrayInputStream is = new ByteArrayInputStream(out.unsafeByteArray(), 0, out.size()); container.writeBlob(partBlobName, is, out.size(), new ImmutableBlobContainer.WriterListener() { @Override public void onCompleted() { listener.onCompleted(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index 6d391f674ac..88b855762b3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -159,9 +160,11 @@ public class GatewayService extends AbstractLifecycleComponent i } executor.execute(new Runnable() { @Override public void run() { - logger.debug("writing to gateway"); + logger.debug("writing to gateway {} ...", gateway); + StopWatch stopWatch = new StopWatch().start(); try { gateway.write(event.state().metaData()); + logger.debug("wrote to gateway {}, took {}", gateway, stopWatch.stop().totalTime()); // TODO, we need to remember that we failed, maybe add a retry scheduler? } catch (Exception e) { logger.error("failed to write to gateway", e); @@ -176,7 +179,7 @@ public class GatewayService extends AbstractLifecycleComponent i * when waiting, and indicates that everything was created within teh wait timeout. */ private Boolean readFromGateway(@Nullable TimeValue waitTimeout) { - logger.debug("reading state from gateway..."); + logger.debug("reading state from gateway {} ...", gateway); MetaData metaData; try { metaData = gateway.read(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index e7179cb2b3f..8252ce483a3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -189,7 +189,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem /** * Snapshots the given shard into the gateway. */ - public synchronized void snapshot(String reason) throws IndexShardGatewaySnapshotFailedException { + public synchronized void snapshot(final String reason) throws IndexShardGatewaySnapshotFailedException { if (!indexShard.routingEntry().primary()) { return; // throw new IndexShardGatewaySnapshotNotAllowedException(shardId, "Snapshot not allowed on non primary shard"); @@ -207,6 +207,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem @Override public IndexShardGateway.SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException { if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogLength < translogSnapshot.length()) { + logger.debug("snapshot ({}) to {} ...", reason, shardGateway); IndexShardGateway.SnapshotStatus snapshotStatus = shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogPosition, lastTranslogLength)); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index 5198ae6a278..e417507c81f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.Sets; -import org.elasticsearch.common.io.FastByteArrayInputStream; import org.elasticsearch.common.io.FastByteArrayOutputStream; import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; @@ -60,6 +59,7 @@ import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.threadpool.ThreadPool; import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.ref.SoftReference; @@ -675,7 +675,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo if (counter.decrementAndGet() == 0) { // now, write the expected md5 byte[] md5 = Digest.md5HexToByteArray(fileMetaData.md5()); - indexContainer.writeBlob(fileMetaData.name() + ".md5", new FastByteArrayInputStream(md5), md5.length, new ImmutableBlobContainer.WriterListener() { + indexContainer.writeBlob(fileMetaData.name() + ".md5", new ByteArrayInputStream(md5), md5.length, new ImmutableBlobContainer.WriterListener() { @Override public void onCompleted() { latch.countDown(); } diff --git a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudImmutableBlobContainer.java b/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudImmutableBlobContainer.java index a63d0c604b1..f5ea4eba7ac 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudImmutableBlobContainer.java +++ b/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudImmutableBlobContainer.java @@ -39,7 +39,7 @@ public class CloudImmutableBlobContainer extends AbstractCloudBlobContainer impl } @Override public void writeBlob(String blobName, InputStream is, long sizeInBytes, final WriterListener listener) { - Blob blob = cloudBlobStore.sync().newBlob(buildBlobPath(blobName)); + Blob blob = cloudBlobStore.async().newBlob(buildBlobPath(blobName)); blob.setPayload(is); blob.setContentLength(sizeInBytes); final ListenableFuture future = cloudBlobStore.async().putBlob(cloudBlobStore.container(), blob);