Revert "HADOOP-17451. IOStatistics test failures in S3A code. (#2594)"

This reverts commit d3014e01f3.
(fixing commit text before it is frozen)
This commit is contained in:
Steve Loughran 2021-01-12 17:29:59 +00:00
parent d3014e01f3
commit 05c9c2ed02
7 changed files with 62 additions and 80 deletions

View File

@ -67,46 +67,23 @@ public class StorageStatisticsFromIOStatistics
public Iterator<LongStatistic> getLongStatistics() { public Iterator<LongStatistic> getLongStatistics() {
final Set<Map.Entry<String, Long>> counters = counters() final Set<Map.Entry<String, Long>> counters = counters()
.entrySet(); .entrySet();
final Set<LongStatistic> statisticSet = counters.stream().map( return counters.stream().map(e ->
this::toLongStatistic) new StorageStatistics.LongStatistic(e.getKey(), e.getValue()))
.collect(Collectors.toSet()); .collect(Collectors.toSet()).iterator();
// add the gauges
gauges().entrySet().forEach(entry ->
statisticSet.add(toLongStatistic(entry)));
return statisticSet.iterator();
}
/**
* Convert a counter/gauge entry to a long statistics.
* @param e entry
* @return statistic
*/
private LongStatistic toLongStatistic(final Map.Entry<String, Long> e) {
return new LongStatistic(e.getKey(), e.getValue());
} }
private Map<String, Long> counters() { private Map<String, Long> counters() {
return ioStatistics.counters(); return ioStatistics.counters();
} }
private Map<String, Long> gauges() {
return ioStatistics.gauges();
}
@Override @Override
public Long getLong(final String key) { public Long getLong(final String key) {
Long l = counters().get(key); return counters().get(key);
if (l == null) {
l = gauges().get(key);
}
return l;
} }
@Override @Override
public boolean isTracked(final String key) { public boolean isTracked(final String key) {
return counters().containsKey(key) return counters().containsKey(key);
|| gauges().containsKey(key);
} }
@Override @Override

View File

@ -64,8 +64,10 @@ import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import java.io.Closeable; import java.io.Closeable;
import java.net.URI; import java.net.URI;
import java.time.Duration; import java.time.Duration;
import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -180,6 +182,20 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
*/ */
private final IOStatisticsStore instanceIOStatistics; private final IOStatisticsStore instanceIOStatistics;
/**
* Gauges to create.
* <p></p>
* All statistics which are not gauges or quantiles
* are registered as counters.
*/
private static final Statistic[] GAUGES_TO_CREATE = {
OBJECT_PUT_REQUESTS_ACTIVE,
OBJECT_PUT_BYTES_PENDING,
STREAM_WRITE_BLOCK_UPLOADS_ACTIVE,
STREAM_WRITE_BLOCK_UPLOADS_PENDING,
STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING,
};
/** /**
* Construct the instrumentation for a filesystem. * Construct the instrumentation for a filesystem.
* @param name URI of filesystem. * @param name URI of filesystem.
@ -195,6 +211,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
// create the builder // create the builder
IOStatisticsStoreBuilder storeBuilder = iostatisticsStore(); IOStatisticsStoreBuilder storeBuilder = iostatisticsStore();
// add the gauges
List<Statistic> gauges = Arrays.asList(GAUGES_TO_CREATE);
gauges.forEach(this::gauge);
// declare all counter statistics // declare all counter statistics
EnumSet.allOf(Statistic.class).stream() EnumSet.allOf(Statistic.class).stream()
.filter(statistic -> .filter(statistic ->
@ -203,14 +223,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
counter(stat); counter(stat);
storeBuilder.withCounters(stat.getSymbol()); storeBuilder.withCounters(stat.getSymbol());
}); });
// declare all gauge statistics
EnumSet.allOf(Statistic.class).stream()
.filter(statistic ->
statistic.getType() == StatisticTypeEnum.TYPE_GAUGE)
.forEach(stat -> {
gauge(stat);
storeBuilder.withGauges(stat.getSymbol());
});
// and durations // and durations
EnumSet.allOf(Statistic.class).stream() EnumSet.allOf(Statistic.class).stream()
@ -1340,13 +1352,15 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
this.filesystemStatistics = filesystemStatistics; this.filesystemStatistics = filesystemStatistics;
IOStatisticsStore st = iostatisticsStore() IOStatisticsStore st = iostatisticsStore()
.withCounters( .withCounters(
STREAM_WRITE_BLOCK_UPLOADS.getSymbol(), StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS,
STREAM_WRITE_BYTES.getSymbol(), STREAM_WRITE_BYTES.getSymbol(),
STREAM_WRITE_EXCEPTIONS.getSymbol(), STREAM_WRITE_EXCEPTIONS.getSymbol(),
STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(), StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING,
STREAM_WRITE_TOTAL_TIME.getSymbol(),
STREAM_WRITE_QUEUE_DURATION.getSymbol(), STREAM_WRITE_QUEUE_DURATION.getSymbol(),
STREAM_WRITE_TOTAL_DATA.getSymbol(), STREAM_WRITE_TOTAL_DATA.getSymbol(),
STREAM_WRITE_TOTAL_TIME.getSymbol()) STREAM_WRITE_EXCEPTIONS.getSymbol(),
STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol())
.withGauges( .withGauges(
STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(), STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(),
STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol()) STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol())
@ -1456,7 +1470,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
@Override @Override
public void bytesTransferred(long byteCount) { public void bytesTransferred(long byteCount) {
bytesUploaded.addAndGet(byteCount); bytesUploaded.addAndGet(byteCount);
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, -byteCount); incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, -byteCount);
} }
@Override @Override

View File

@ -76,7 +76,6 @@ import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndel
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.toPathList; import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.toPathList;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.test.LambdaTestUtils.eval; import static org.apache.hadoop.test.LambdaTestUtils.eval;
@ -684,8 +683,7 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
readOnlyFiles.size()); readOnlyFiles.size());
rejectionCount.assertDiffEquals("Wrong rejection count", rejectionCount.assertDiffEquals("Wrong rejection count",
readOnlyFiles.size()); readOnlyFiles.size());
reset(rejectionCount, deleteVerbCount, deleteObjectCount, reset(rejectionCount, deleteVerbCount, deleteObjectCount);
bulkDeleteVerbCount);
} }
// all the files are still there? (avoid in scale test due to cost) // all the files are still there? (avoid in scale test due to cost)
if (!scaleTest) { if (!scaleTest) {
@ -694,13 +692,9 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
describe("Trying to delete upper-level directory"); describe("Trying to delete upper-level directory");
ex = expectDeleteForbidden(basePath); ex = expectDeleteForbidden(basePath);
String iostats = ioStatisticsSourceToString(roleFS);
if (multiDelete) { if (multiDelete) {
// multi-delete status checks // multi-delete status checks
deleteVerbCount.assertDiffEquals("Wrong delete request count", 0); deleteVerbCount.assertDiffEquals("Wrong delete count", 1);
bulkDeleteVerbCount.assertDiffEquals(
"Wrong count of delete operations in " + iostats, 1);
MultiObjectDeleteException mde = extractCause( MultiObjectDeleteException mde = extractCause(
MultiObjectDeleteException.class, ex); MultiObjectDeleteException.class, ex);
List<MultiObjectDeleteSupport.KeyPath> undeletedKeyPaths = List<MultiObjectDeleteSupport.KeyPath> undeletedKeyPaths =

View File

@ -475,7 +475,7 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase {
/** /**
* Execute a closure expecting a specific number of HEAD/LIST calls * Execute a closure expecting a specific number of HEAD/LIST calls
* on <i>raw</i> S3 stores only. The operation is always evaluated. * on <i>raw</i> S3 stores only.
* @param cost expected cost * @param cost expected cost
* @param eval closure to evaluate * @param eval closure to evaluate
* @param <T> return type of closure * @param <T> return type of closure
@ -484,8 +484,7 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase {
protected <T> T verifyRaw( protected <T> T verifyRaw(
OperationCost cost, OperationCost cost,
Callable<T> eval) throws Exception { Callable<T> eval) throws Exception {
return verifyMetrics(eval, return verifyMetrics(eval, whenRaw(cost));
whenRaw(cost), OperationCostValidator.always());
} }
/** /**

View File

@ -121,21 +121,17 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
with(DIRECTORIES_DELETED, 0), with(DIRECTORIES_DELETED, 0),
with(FILES_DELETED, 1), with(FILES_DELETED, 1),
// a single DELETE call is made to delete the object
with(OBJECT_DELETE_REQUEST, DELETE_OBJECT_REQUEST),
// keeping: create no parent dirs or delete parents // keeping: create no parent dirs or delete parents
withWhenKeeping(DIRECTORIES_CREATED, 0), withWhenKeeping(DIRECTORIES_CREATED, 0),
withWhenKeeping(OBJECT_BULK_DELETE_REQUEST, 0), withWhenKeeping(OBJECT_DELETE_OBJECTS, DELETE_OBJECT_REQUEST),
// deleting: create a parent and delete any of its parents // deleting: create a parent and delete any of its parents
withWhenDeleting(DIRECTORIES_CREATED, 1), withWhenDeleting(DIRECTORIES_CREATED, 1),
// a bulk delete for all parents is issued. // two objects will be deleted
// the number of objects in it depends on the depth of the tree; withWhenDeleting(OBJECT_DELETE_OBJECTS,
// don't worry about that DELETE_OBJECT_REQUEST
withWhenDeleting(OBJECT_BULK_DELETE_REQUEST, DELETE_MARKER_REQUEST) + DELETE_MARKER_REQUEST)
); );
// there is an empty dir for a parent // there is an empty dir for a parent
S3AFileStatus status = verifyRawInnerGetFileStatus(dir, true, S3AFileStatus status = verifyRawInnerGetFileStatus(dir, true,
StatusProbeEnum.ALL, GET_FILE_STATUS_ON_DIR); StatusProbeEnum.ALL, GET_FILE_STATUS_ON_DIR);

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.S3ATestUtils;
@ -48,11 +49,10 @@ import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
/** /**
* Scale test which creates a huge file. * Scale test which creates a huge file.
@ -169,8 +169,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
// there's lots of logging here, so that a tail -f on the output log // there's lots of logging here, so that a tail -f on the output log
// can give a view of what is happening. // can give a view of what is happening.
S3AFileSystem fs = getFileSystem(); S3AFileSystem fs = getFileSystem();
IOStatistics iostats = fs.getIOStatistics(); StorageStatistics storageStatistics = fs.getStorageStatistics();
String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol(); String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol();
String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol(); String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol();
Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE; Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE;
@ -206,9 +205,9 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
percentage, percentage,
writtenMB, writtenMB,
filesizeMB, filesizeMB,
iostats.counters().get(putBytes), storageStatistics.getLong(putBytes),
gaugeValue(putBytesPending), gaugeValue(putBytesPending),
iostats.counters().get(putRequests), storageStatistics.getLong(putRequests),
gaugeValue(putRequestsActive), gaugeValue(putRequestsActive),
elapsedTime, elapsedTime,
writtenMB / elapsedTime)); writtenMB / elapsedTime));
@ -228,27 +227,27 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
logFSState(); logFSState();
bandwidth(timer, filesize); bandwidth(timer, filesize);
LOG.info("Statistics after stream closed: {}", streamStatistics); LOG.info("Statistics after stream closed: {}", streamStatistics);
IOStatistics iostats = snapshotIOStatistics(
retrieveIOStatistics(getFileSystem()));
LOG.info("IOStatistics after upload: {}", LOG.info("IOStatistics after upload: {}",
demandStringifyIOStatistics(iostats)); demandStringifyIOStatistics(iostats));
long putRequestCount = lookupCounterStatistic(iostats, putRequests); long putRequestCount = storageStatistics.getLong(putRequests);
long putByteCount = lookupCounterStatistic(iostats, putBytes); Long putByteCount = storageStatistics.getLong(putBytes);
Assertions.assertThat(putRequestCount) Assertions.assertThat(putRequestCount)
.describedAs("Put request count from filesystem stats %s", .describedAs("Put request count from filesystem stats %s",
iostats) iostats)
.isGreaterThan(0); .isGreaterThan(0);
Assertions.assertThat(putByteCount) Assertions.assertThat(putByteCount)
.describedAs("%s count from filesystem stats %s", .describedAs("putByteCount count from filesystem stats %s",
putBytes, iostats) iostats)
.isGreaterThan(0); .isGreaterThan(0);
LOG.info("PUT {} bytes in {} operations; {} MB/operation", LOG.info("PUT {} bytes in {} operations; {} MB/operation",
putByteCount, putRequestCount, putByteCount, putRequestCount,
putByteCount / (putRequestCount * _1MB)); putByteCount / (putRequestCount * _1MB));
LOG.info("Time per PUT {} nS", LOG.info("Time per PUT {} nS",
toHuman(timer.nanosPerOperation(putRequestCount))); toHuman(timer.nanosPerOperation(putRequestCount)));
verifyStatisticGaugeValue(iostats, putRequestsActive.getSymbol(), 0); assertEquals("active put requests in \n" + fs,
verifyStatisticGaugeValue(iostats, 0, gaugeValue(putRequestsActive));
STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol(), 0);
progress.verifyNoFailures( progress.verifyNoFailures(
"Put file " + fileToCreate + " of size " + filesize); "Put file " + fileToCreate + " of size " + filesize);
if (streamStatistics != null) { if (streamStatistics != null) {

View File

@ -23,9 +23,11 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3ATestConstants; import org.apache.hadoop.fs.s3a.S3ATestConstants;
import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -33,7 +35,6 @@ import org.slf4j.LoggerFactory;
import java.io.InputStream; import java.io.InputStream;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupGaugeStatistic;
/** /**
* Base class for scale tests; here is where the common scale configuration * Base class for scale tests; here is where the common scale configuration
@ -183,15 +184,17 @@ public class S3AScaleTestBase extends AbstractS3ATestBase {
} }
/** /**
* Get the gauge value of a statistic from the * Get the gauge value of a statistic. Raises an assertion if
* IOStatistics of the filesystem. Raises an assertion if
* there is no such gauge. * there is no such gauge.
* @param statistic statistic to look up * @param statistic statistic to look up
* @return the value. * @return the value.
*/ */
public long gaugeValue(Statistic statistic) { public long gaugeValue(Statistic statistic) {
return lookupGaugeStatistic(getFileSystem().getIOStatistics(), S3AInstrumentation instrumentation = getFileSystem().getInstrumentation();
statistic.getSymbol()); MutableGaugeLong gauge = instrumentation.lookupGauge(statistic.getSymbol());
assertNotNull("No gauge " + statistic
+ " in " + instrumentation.dump("", " = ", "\n", true), gauge);
return gauge.value();
} }
/** /**