diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java new file mode 100644 index 00000000000..d73b24b3d8b --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java @@ -0,0 +1,11 @@ +package org.apache.hadoop.hbase.replication.regionserver; + +public interface MetricsReplicationSinkSource { + public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp"; + public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches"; + public static final String SINK_APPLIED_OPS = "sink.appliedOps"; + + void setLastAppliedOpAge(long age); + void incrAppliedBatches(long batches); + void incrAppliedOps(long batchsize); +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java new file mode 100644 index 00000000000..0e1c5cccab5 --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +public interface MetricsReplicationSourceFactory { + public MetricsReplicationSinkSource getSink(); + public MetricsReplicationSourceSource getSource(String id); + public MetricsReplicationSourceSource getGlobalSource(); +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java new file mode 100644 index 00000000000..66d265a90ba --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +public interface MetricsReplicationSourceSource { + + public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue"; + public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; + public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches"; + + public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs"; + public static final String SOURCE_SHIPPED_OPS = "source.shippedOps"; + + public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes"; + public static final String SOURCE_LOG_READ_IN_EDITS = "source.logEditsRead"; + + public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered"; + + void setLastShippedAge(long age); + void setSizeOfLogQueue(int size); + void incrSizeOfLogQueue(int size); + void decrSizeOfLogQueue(int size); + void incrLogEditsFiltered(long size); + void incrBatchesShipped(int batches); + void incrOpsShipped(long ops); + void incrShippedKBs(long size); + void incrLogReadInBytes(long size); + void incrLogReadInEdits(long size); + void clear(); +} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java new file mode 100644 index 00000000000..a210171577c --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{ + private final MetricsReplicationSourceImpl rms; + + private final MutableGaugeLong ageOfLastShippedOpGauge; + private final MutableGaugeLong sizeOfLogQueueGauge; + private final MutableCounterLong logReadInEditsCounter; + private final MutableCounterLong logEditsFilteredCounter; + private final MutableCounterLong shippedBatchesCounter; + private final MutableCounterLong shippedOpsCounter; + private final MutableCounterLong shippedKBsCounter; + private final MutableCounterLong logReadInBytesCounter; + + public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { + this.rms = rms; + + ageOfLastShippedOpGauge = rms.getMetricsRegistry().getLongGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, 0L); + + sizeOfLogQueueGauge = rms.getMetricsRegistry().getLongGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); + + shippedBatchesCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_SHIPPED_BATCHES, 0L); + + shippedOpsCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_SHIPPED_OPS, 0L); + + shippedKBsCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_SHIPPED_KBS, 0L); + + logReadInBytesCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_READ_IN_BYTES, 0L); + + logReadInEditsCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_READ_IN_EDITS, 0L); + + logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_EDITS_FILTERED, 0L); + } + + @Override public void setLastShippedAge(long age) { + ageOfLastShippedOpGauge.set(age); + } + + @Override public void setSizeOfLogQueue(int size) { + sizeOfLogQueueGauge.set(size); + } + + @Override public void incrSizeOfLogQueue(int size) { + sizeOfLogQueueGauge.incr(size); + } + + @Override public void decrSizeOfLogQueue(int size) { + sizeOfLogQueueGauge.decr(size); + } + + @Override public void incrLogReadInEdits(long size) { + logReadInEditsCounter.incr(size); + } + + @Override public void incrLogEditsFiltered(long size) { + logEditsFilteredCounter.incr(size); + } + + @Override public void incrBatchesShipped(int batches) { + shippedBatchesCounter.incr(batches); + } + + @Override public void incrOpsShipped(long ops) { + shippedOpsCounter.incr(ops); + } + + @Override public void incrShippedKBs(long size) { + shippedKBsCounter.incr(size); + } + + @Override public void incrLogReadInBytes(long size) { + logReadInBytesCounter.incr(size); + } + + @Override public void clear() { + } +} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java new file mode 100644 index 00000000000..3025e3e7724 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkSource { + + private final MutableGaugeLong ageGauge; + private final MutableCounterLong batchesCounter; + private final MutableCounterLong opsCounter; + + public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) { + ageGauge = rms.getMetricsRegistry().getLongGauge(SINK_AGE_OF_LAST_APPLIED_OP, 0L); + batchesCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_BATCHES, 0L); + opsCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_OPS, 0L); + } + + @Override public void setLastAppliedOpAge(long age) { + ageGauge.set(age); + } + + @Override public void incrAppliedBatches(long batches) { + batchesCounter.incr(batches); + } + + @Override public void incrAppliedOps(long batchsize) { + opsCounter.incr(batchsize); + } +} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java new file mode 100644 index 00000000000..cb78d3e2d97 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java @@ -0,0 +1,21 @@ +package org.apache.hadoop.hbase.replication.regionserver; + +public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSourceFactory { + + private static enum SourceHolder { + INSTANCE; + final MetricsReplicationSourceImpl source = new MetricsReplicationSourceImpl(); + } + + @Override public MetricsReplicationSinkSource getSink() { + return new MetricsReplicationSinkSourceImpl(SourceHolder.INSTANCE.source); + } + + @Override public MetricsReplicationSourceSource getSource(String id) { + return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id); + } + + @Override public MetricsReplicationSourceSource getGlobalSource() { + return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source); + } +} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java new file mode 100644 index 00000000000..0e6b8c817d1 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -0,0 +1,111 @@ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSourceSource { + + private final MetricsReplicationSourceImpl rms; + private final String id; + private final String sizeOfLogQueueKey; + private final String ageOfLastShippedOpKey; + private final String logReadInEditsKey; + private final String logEditsFilteredKey; + private final String shippedBatchesKey; + private final String shippedOpsKey; + private final String shippedKBsKey; + private final String logReadInBytesKey; + + private final MutableGaugeLong ageOfLastShippedOpGauge; + private final MutableGaugeLong sizeOfLogQueueGauge; + private final MutableCounterLong logReadInEditsCounter; + private final MutableCounterLong logEditsFilteredCounter; + private final MutableCounterLong shippedBatchesCounter; + private final MutableCounterLong shippedOpsCounter; + private final MutableCounterLong shippedKBsCounter; + private final MutableCounterLong logReadInBytesCounter; + + public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { + this.rms = rms; + this.id = id; + + ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp"; + ageOfLastShippedOpGauge = rms.getMetricsRegistry().getLongGauge(ageOfLastShippedOpKey, 0L); + + sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue"; + sizeOfLogQueueGauge = rms.getMetricsRegistry().getLongGauge(sizeOfLogQueueKey, 0L); + + shippedBatchesKey = "source." + this.id + ".shippedBatches"; + shippedBatchesCounter = rms.getMetricsRegistry().getLongCounter(shippedBatchesKey, 0L); + + shippedOpsKey = "source." + this.id + ".shippedOps"; + shippedOpsCounter = rms.getMetricsRegistry().getLongCounter(shippedOpsKey, 0L); + + shippedKBsKey = "source." + this.id + ".shippedKBs"; + shippedKBsCounter = rms.getMetricsRegistry().getLongCounter(shippedKBsKey, 0L); + + logReadInBytesKey = "source." + this.id + ".logReadInBytes"; + logReadInBytesCounter = rms.getMetricsRegistry().getLongCounter(logReadInBytesKey, 0L); + + logReadInEditsKey = "source." + id + ".logEditsRead"; + logReadInEditsCounter = rms.getMetricsRegistry().getLongCounter(logReadInEditsKey, 0L); + + logEditsFilteredKey = "source." + id + ".logEditsFiltered"; + logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(logEditsFilteredKey, 0L); + } + + @Override public void setLastShippedAge(long age) { + ageOfLastShippedOpGauge.set(age); + } + + @Override public void setSizeOfLogQueue(int size) { + sizeOfLogQueueGauge.set(size); + } + + @Override public void incrSizeOfLogQueue(int size) { + sizeOfLogQueueGauge.incr(size); + } + + @Override public void decrSizeOfLogQueue(int size) { + sizeOfLogQueueGauge.decr(size); + } + + @Override public void incrLogReadInEdits(long size) { + logReadInEditsCounter.incr(size); + } + + @Override public void incrLogEditsFiltered(long size) { + logEditsFilteredCounter.incr(size); + } + + @Override public void incrBatchesShipped(int batches) { + shippedBatchesCounter.incr(batches); + } + + @Override public void incrOpsShipped(long ops) { + shippedOpsCounter.incr(ops); + } + + @Override public void incrShippedKBs(long size) { + shippedKBsCounter.incr(size); + } + + @Override public void incrLogReadInBytes(long size) { + logReadInBytesCounter.incr(size); + } + + @Override public void clear() { + rms.removeMetric(ageOfLastShippedOpKey); + + rms.removeMetric(sizeOfLogQueueKey); + + rms.removeMetric(shippedBatchesKey); + rms.removeMetric(shippedOpsKey); + rms.removeMetric(shippedKBsKey); + + rms.removeMetric(logReadInBytesKey); + rms.removeMetric(logReadInEditsKey); + + rms.removeMetric(logEditsFilteredKey); + } +} diff --git a/hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory b/hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory new file mode 100644 index 00000000000..50277b4ac97 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory @@ -0,0 +1 @@ +org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactoryImpl \ No newline at end of file diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceFactoryImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceFactoryImpl.java new file mode 100644 index 00000000000..20162c2a44e --- /dev/null +++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceFactoryImpl.java @@ -0,0 +1,18 @@ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestMetricsReplicationSourceFactoryImpl { + + + @Test + public void testGetInstance() throws Exception { + MetricsReplicationSourceFactory rms = CompatibilitySingletonFactory + .getInstance(MetricsReplicationSourceFactory.class); + assertTrue(rms instanceof MetricsReplicationSourceFactoryImpl); + } + +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java index 00d6d6396c0..0c9d0169124 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; -import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSource; /** * This class is for maintaining the various replication statistics for a sink and publishing them @@ -29,15 +28,12 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSource @InterfaceAudience.Private public class MetricsSink { - public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp"; - public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches"; - public static final String SINK_APPLIED_OPS = "sink.appliedOps"; - - private MetricsReplicationSource rms; private long lastTimestampForAge = System.currentTimeMillis(); + private final MetricsReplicationSinkSource mss; public MetricsSink() { - rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class); + mss = + CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getSink(); } /** @@ -52,7 +48,7 @@ public class MetricsSink { lastTimestampForAge = timestamp; age = System.currentTimeMillis() - lastTimestampForAge; } - rms.setGauge(SINK_AGE_OF_LAST_APPLIED_OP, age); + mss.setLastAppliedOpAge(age); return age; } @@ -71,8 +67,8 @@ public class MetricsSink { * @param batchSize */ public void applyBatch(long batchSize) { - rms.incCounters(SINK_APPLIED_BATCHES, 1); - rms.incCounters(SINK_APPLIED_OPS, batchSize); + mss.incrAppliedBatches(1); + mss.incrAppliedOps(batchSize); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 29ff1acfede..a734b9ce07f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -32,31 +32,13 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) public class MetricsSource { - public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue"; - public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; - public static final String SOURCE_LOG_EDITS_READ = "source.logEditsRead"; - public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered"; - public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches"; - public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs"; - public static final String SOURCE_SHIPPED_OPS = "source.shippedOps"; - public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes"; - public static final Log LOG = LogFactory.getLog(MetricsSource.class); - private String id; private long lastTimestamp = 0; private int lastQueueSize = 0; - private String sizeOfLogQueKey; - private String ageOfLastShippedOpKey; - private String logEditsReadKey; - private String logEditsFilteredKey; - private final String shippedBatchesKey; - private final String shippedOpsKey; - private final String shippedKBsKey; - private final String logReadInBytesKey; - - private MetricsReplicationSource rms; + private final MetricsReplicationSourceSource singleSourceSource; + private final MetricsReplicationSourceSource globalSourceSource; /** * Constructor used to register the metrics @@ -64,17 +46,10 @@ public class MetricsSource { * @param id Name of the source this class is monitoring */ public MetricsSource(String id) { - this.id = id; - - sizeOfLogQueKey = "source." + id + ".sizeOfLogQueue"; - ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp"; - logEditsReadKey = "source." + id + ".logEditsRead"; - logEditsFilteredKey = "source." + id + ".logEditsFiltered"; - shippedBatchesKey = "source." + this.id + ".shippedBatches"; - shippedOpsKey = "source." + this.id + ".shippedOps"; - shippedKBsKey = "source." + this.id + ".shippedKBs"; - logReadInBytesKey = "source." + this.id + ".logReadInBytes"; - rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class); + singleSourceSource = + CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) + .getSource(id); + globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); } /** @@ -84,8 +59,8 @@ public class MetricsSource { */ public void setAgeOfLastShippedOp(long timestamp) { long age = EnvironmentEdgeManager.currentTime() - timestamp; - rms.setGauge(ageOfLastShippedOpKey, age); - rms.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age); + singleSourceSource.setLastShippedAge(age); + globalSourceSource.setLastShippedAge(age); this.lastTimestamp = timestamp; } @@ -105,8 +80,8 @@ public class MetricsSource { * @param size the size. */ public void setSizeOfLogQueue(int size) { - rms.setGauge(sizeOfLogQueKey, size); - rms.incGauge(SOURCE_SIZE_OF_LOG_QUEUE, size - lastQueueSize); + singleSourceSource.setSizeOfLogQueue(size); + globalSourceSource.incrSizeOfLogQueue(size - lastQueueSize); lastQueueSize = size; } @@ -116,8 +91,8 @@ public class MetricsSource { * @param delta the number of log edits read. */ private void incrLogEditsRead(long delta) { - rms.incCounters(logEditsReadKey, delta); - rms.incCounters(SOURCE_LOG_EDITS_READ, delta); + singleSourceSource.incrLogReadInEdits(delta); + globalSourceSource.incrLogReadInEdits(delta); } /** Increment the number of log edits read by one. */ @@ -131,8 +106,8 @@ public class MetricsSource { * @param delta the number filtered. */ private void incrLogEditsFiltered(long delta) { - rms.incCounters(logEditsFilteredKey, delta); - rms.incCounters(SOURCE_LOG_EDITS_FILTERED, delta); + singleSourceSource.incrLogEditsFiltered(delta); + globalSourceSource.incrLogEditsFiltered(delta); } /** The number of log edits filtered out. */ @@ -146,29 +121,26 @@ public class MetricsSource { * @param batchSize the size of the batch that was shipped to sinks. */ public void shipBatch(long batchSize, int sizeInKB) { - rms.incCounters(shippedBatchesKey, 1); - rms.incCounters(SOURCE_SHIPPED_BATCHES, 1); - rms.incCounters(shippedOpsKey, batchSize); - rms.incCounters(SOURCE_SHIPPED_OPS, batchSize); - rms.incCounters(shippedKBsKey, sizeInKB); - rms.incCounters(SOURCE_SHIPPED_KBS, sizeInKB); + singleSourceSource.incrBatchesShipped(1); + globalSourceSource.incrBatchesShipped(1); + + singleSourceSource.incrOpsShipped(batchSize); + globalSourceSource.incrOpsShipped(batchSize); + + singleSourceSource.incrShippedKBs(sizeInKB); + globalSourceSource.incrShippedKBs(sizeInKB); } /** increase the byte number read by source from log file */ public void incrLogReadInBytes(long readInBytes) { - rms.incCounters(logReadInBytesKey, readInBytes); - rms.incCounters(SOURCE_LOG_READ_IN_BYTES, readInBytes); + singleSourceSource.incrLogReadInBytes(readInBytes); + globalSourceSource.incrLogReadInBytes(readInBytes); } /** Removes all metrics about this Source. */ public void clear() { - rms.removeMetric(sizeOfLogQueKey); - rms.decGauge(SOURCE_SIZE_OF_LOG_QUEUE, lastQueueSize); + singleSourceSource.clear(); + globalSourceSource.decrSizeOfLogQueue(lastQueueSize); lastQueueSize = 0; - rms.removeMetric(ageOfLastShippedOpKey); - - rms.removeMetric(logEditsFilteredKey); - rms.removeMetric(logEditsReadKey); - } }