nicer logging, and use sync streams
This commit is contained in:
parent
23ddf8bc53
commit
0072dd816b
|
@ -24,9 +24,9 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||||
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
|
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
|
||||||
import org.elasticsearch.common.collect.ImmutableMap;
|
import org.elasticsearch.common.collect.ImmutableMap;
|
||||||
import org.elasticsearch.common.collect.Sets;
|
import org.elasticsearch.common.collect.Sets;
|
||||||
import org.elasticsearch.common.io.FastByteArrayInputStream;
|
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -57,7 +57,8 @@ public class ImmutableAppendableBlobContainer extends AbstractBlobContainer impl
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
FastByteArrayInputStream is = new FastByteArrayInputStream(out.unsafeByteArray(), 0, out.size());
|
// use teh sync one
|
||||||
|
ByteArrayInputStream is = new ByteArrayInputStream(out.unsafeByteArray(), 0, out.size());
|
||||||
container.writeBlob(partBlobName, is, out.size(), new ImmutableBlobContainer.WriterListener() {
|
container.writeBlob(partBlobName, is, out.size(), new ImmutableBlobContainer.WriterListener() {
|
||||||
@Override public void onCompleted() {
|
@Override public void onCompleted() {
|
||||||
listener.onCompleted();
|
listener.onCompleted();
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
||||||
|
import org.elasticsearch.common.StopWatch;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -159,9 +160,11 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
||||||
}
|
}
|
||||||
executor.execute(new Runnable() {
|
executor.execute(new Runnable() {
|
||||||
@Override public void run() {
|
@Override public void run() {
|
||||||
logger.debug("writing to gateway");
|
logger.debug("writing to gateway {} ...", gateway);
|
||||||
|
StopWatch stopWatch = new StopWatch().start();
|
||||||
try {
|
try {
|
||||||
gateway.write(event.state().metaData());
|
gateway.write(event.state().metaData());
|
||||||
|
logger.debug("wrote to gateway {}, took {}", gateway, stopWatch.stop().totalTime());
|
||||||
// TODO, we need to remember that we failed, maybe add a retry scheduler?
|
// TODO, we need to remember that we failed, maybe add a retry scheduler?
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("failed to write to gateway", e);
|
logger.error("failed to write to gateway", e);
|
||||||
|
@ -176,7 +179,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
||||||
* when waiting, and indicates that everything was created within teh wait timeout.
|
* when waiting, and indicates that everything was created within teh wait timeout.
|
||||||
*/
|
*/
|
||||||
private Boolean readFromGateway(@Nullable TimeValue waitTimeout) {
|
private Boolean readFromGateway(@Nullable TimeValue waitTimeout) {
|
||||||
logger.debug("reading state from gateway...");
|
logger.debug("reading state from gateway {} ...", gateway);
|
||||||
MetaData metaData;
|
MetaData metaData;
|
||||||
try {
|
try {
|
||||||
metaData = gateway.read();
|
metaData = gateway.read();
|
||||||
|
|
|
@ -189,7 +189,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
|
||||||
/**
|
/**
|
||||||
* Snapshots the given shard into the gateway.
|
* Snapshots the given shard into the gateway.
|
||||||
*/
|
*/
|
||||||
public synchronized void snapshot(String reason) throws IndexShardGatewaySnapshotFailedException {
|
public synchronized void snapshot(final String reason) throws IndexShardGatewaySnapshotFailedException {
|
||||||
if (!indexShard.routingEntry().primary()) {
|
if (!indexShard.routingEntry().primary()) {
|
||||||
return;
|
return;
|
||||||
// throw new IndexShardGatewaySnapshotNotAllowedException(shardId, "Snapshot not allowed on non primary shard");
|
// throw new IndexShardGatewaySnapshotNotAllowedException(shardId, "Snapshot not allowed on non primary shard");
|
||||||
|
@ -207,6 +207,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
|
||||||
@Override public IndexShardGateway.SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
|
@Override public IndexShardGateway.SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
|
||||||
if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogLength < translogSnapshot.length()) {
|
if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogLength < translogSnapshot.length()) {
|
||||||
|
|
||||||
|
logger.debug("snapshot ({}) to {} ...", reason, shardGateway);
|
||||||
IndexShardGateway.SnapshotStatus snapshotStatus =
|
IndexShardGateway.SnapshotStatus snapshotStatus =
|
||||||
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogPosition, lastTranslogLength));
|
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogPosition, lastTranslogLength));
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.elasticsearch.common.blobstore.*;
|
||||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||||
import org.elasticsearch.common.collect.ImmutableMap;
|
import org.elasticsearch.common.collect.ImmutableMap;
|
||||||
import org.elasticsearch.common.collect.Sets;
|
import org.elasticsearch.common.collect.Sets;
|
||||||
import org.elasticsearch.common.io.FastByteArrayInputStream;
|
|
||||||
import org.elasticsearch.common.io.FastByteArrayOutputStream;
|
import org.elasticsearch.common.io.FastByteArrayOutputStream;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamInput;
|
import org.elasticsearch.common.io.stream.BytesStreamInput;
|
||||||
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
||||||
|
@ -60,6 +59,7 @@ import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.ref.SoftReference;
|
import java.lang.ref.SoftReference;
|
||||||
|
@ -675,7 +675,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
|
||||||
if (counter.decrementAndGet() == 0) {
|
if (counter.decrementAndGet() == 0) {
|
||||||
// now, write the expected md5
|
// now, write the expected md5
|
||||||
byte[] md5 = Digest.md5HexToByteArray(fileMetaData.md5());
|
byte[] md5 = Digest.md5HexToByteArray(fileMetaData.md5());
|
||||||
indexContainer.writeBlob(fileMetaData.name() + ".md5", new FastByteArrayInputStream(md5), md5.length, new ImmutableBlobContainer.WriterListener() {
|
indexContainer.writeBlob(fileMetaData.name() + ".md5", new ByteArrayInputStream(md5), md5.length, new ImmutableBlobContainer.WriterListener() {
|
||||||
@Override public void onCompleted() {
|
@Override public void onCompleted() {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ public class CloudImmutableBlobContainer extends AbstractCloudBlobContainer impl
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void writeBlob(String blobName, InputStream is, long sizeInBytes, final WriterListener listener) {
|
@Override public void writeBlob(String blobName, InputStream is, long sizeInBytes, final WriterListener listener) {
|
||||||
Blob blob = cloudBlobStore.sync().newBlob(buildBlobPath(blobName));
|
Blob blob = cloudBlobStore.async().newBlob(buildBlobPath(blobName));
|
||||||
blob.setPayload(is);
|
blob.setPayload(is);
|
||||||
blob.setContentLength(sizeInBytes);
|
blob.setContentLength(sizeInBytes);
|
||||||
final ListenableFuture<String> future = cloudBlobStore.async().putBlob(cloudBlobStore.container(), blob);
|
final ListenableFuture<String> future = cloudBlobStore.async().putBlob(cloudBlobStore.container(), blob);
|
||||||
|
|
Loading…
Reference in New Issue