From f55ddc40663a58a13f16a11d990b163a31f8b4be Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Thu, 14 May 2026 17:14:46 +0000 Subject: [PATCH 1/2] Add a metric to capture write enqueue latency This captures from when the caller tries to send a request until the request is finally sent on the wire, primarily so we can see when the enqueue time is increasing (a signal for pressure on the IO loop). --- .../netflix/evcache/test/EVCacheTestDI.java | 26 +++++++++ .../metrics/EVCacheMetricsFactory.java | 1 + .../operation/EVCacheBulkGetFuture.java | 7 +++ .../operation/EVCacheOperationFuture.java | 8 ++- .../netflix/evcache/pool/EVCacheClient.java | 27 +++++++++ .../spy/memcached/EVCacheMemcachedClient.java | 2 +- .../EVCacheBulkGetFutureLatencyTest.java | 41 +++++++++++++ .../EVCacheOperationFutureLatencyTest.java | 57 +++++++++++++++++++ evcache-core/src/test/java/test-suite.xml | 2 + 9 files changed, 169 insertions(+), 2 deletions(-) create mode 100644 evcache-core/src/test/java/com/netflix/evcache/operation/EVCacheBulkGetFutureLatencyTest.java create mode 100644 evcache-core/src/test/java/com/netflix/evcache/operation/EVCacheOperationFutureLatencyTest.java diff --git a/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java b/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java index 63cc7677..d95875dc 100644 --- a/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java +++ b/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java @@ -21,6 +21,7 @@ import com.netflix.spectator.api.Gauge; import com.netflix.spectator.api.Id; import com.netflix.spectator.api.Registry; +import com.netflix.spectator.api.Timer; import com.netflix.spectator.api.patterns.PolledMeter; import java.util.HashMap; import java.util.List; @@ -111,6 +112,31 @@ public void testLoopCpuUtilizationMetricRegistered() throws Exception { assertTrue(nonZero, "expected loop CPU utilization meter to report a non-zero value"); } + @Test(dependsOnMethods = { "testLoopCpuUtilizationMetricRegistered" }) + public void testLoopEnqueueToWriteLatencyMetricRecords() throws Exception { + final Registry registry = EVCacheMetricsFactory.getInstance().getRegistry(); + final Map> clientsByServerGroup = manager.getEVCacheClientPool(appName).getAllInstancesByServerGroup(); + assertFalse(clientsByServerGroup.isEmpty(), "expected EVCache clients for " + appName); + + boolean recorded = false; + for (int attempt = 0; attempt < 10 && !recorded; attempt++) { + get(attempt, evCache); + Thread.sleep(50); + for (List clients : clientsByServerGroup.values()) { + for (EVCacheClient client : clients) { + final Id id = EVCacheMetricsFactory.getInstance().getId(EVCacheMetricsFactory.INTERNAL_LOOP_ENQUEUE_TO_WRITE_LATENCY, client.getTagList()); + final Timer timer = registry.timer(id); + if (timer.count() > 0L) { + recorded = true; + break; + } + } + if (recorded) break; + } + } + assertTrue(recorded, "expected enqueueToWriteLatency timer to record at least one sample"); + } + @Test(dependsOnMethods = { "testEVCache" }) public void testKeySizeCheck() throws Exception { final String key = "This is an invalid key"; diff --git a/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java b/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java index 2de0adda..bfe40e8a 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java +++ b/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java @@ -285,6 +285,7 @@ public String getStatusCode(StatusCode sc) { public static final String INTERNAL_EXECUTOR_SCHEDULED = "internal.evc.client.scheduledExecutor"; public static final String INTERNAL_POOL_INIT_ERROR = "internal.evc.client.init.error"; public static final String INTERNAL_LOOP_CPU_UTILIZATION = "internal.evc.client.loop.cpuUtilization"; + public static final String INTERNAL_LOOP_ENQUEUE_TO_WRITE_LATENCY = "internal.evc.client.loop.enqueueToWriteLatency"; public static final String INTERNAL_NUM_CHUNK_SIZE = "internal.evc.client.chunking.numOfChunks"; public static final String INTERNAL_CHUNK_DATA_SIZE = "internal.evc.client.chunking.dataSize"; diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index b8379136..d2f7b7bf 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -53,15 +53,21 @@ public class EVCacheBulkGetFuture extends BulkGetFuture { private final Collection ops; private final CountDownLatch latch; private final long start; + private final long operationAttachedNs; private final EVCacheClient client; private AtomicReferenceArray operationStates; public EVCacheBulkGetFuture(Map> m, Collection getOps, CountDownLatch l, ExecutorService service, EVCacheClient client) { + this(m, getOps, l, service, client, System.nanoTime()); + } + + public EVCacheBulkGetFuture(Map> m, Collection getOps, CountDownLatch l, ExecutorService service, EVCacheClient client, long operationAttachedNs) { super(m, getOps, l, service); rvMap = m; ops = getOps; latch = l; this.start = System.currentTimeMillis(); + this.operationAttachedNs = operationAttachedNs; this.client = client; this.operationStates = null; } @@ -345,6 +351,7 @@ public void signalComplete() { public void signalSingleOpComplete(int sequenceNo, GetOperation op) { this.operationStates.set(sequenceNo, new SingleOperationState(op)); + client.recordLoopEnqueueToWriteLatency(op, operationAttachedNs); } public boolean cancel(boolean ign) { diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheOperationFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheOperationFuture.java index 2b9941cd..44ae462a 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheOperationFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheOperationFuture.java @@ -74,6 +74,7 @@ private static final class LazySharedExecutor { private final CountDownLatch latch; private final AtomicReference objRef; private Operation op; + private long operationAttachedNs; private final String key; private final long start; private final EVCacheClient client; @@ -93,6 +94,7 @@ public Operation getOperation() { public void setOperation(Operation to) { this.op = to; + this.operationAttachedNs = System.nanoTime(); super.setOperation(to); } @@ -380,7 +382,11 @@ public void call() { } public void signalComplete() { - super.signalComplete(); + try { + client.recordLoopEnqueueToWriteLatency(op, operationAttachedNs); + } finally { + super.signalComplete(); + } } /** diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index 6895389f..c2b08083 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -23,6 +23,7 @@ import com.netflix.spectator.api.Id; import com.netflix.spectator.api.Registry; import com.netflix.spectator.api.Tag; +import com.netflix.spectator.api.Timer; import com.netflix.spectator.api.patterns.PolledMeter; import java.io.BufferedInputStream; import java.io.IOException; @@ -32,6 +33,7 @@ import java.net.SocketAddress; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Collection; @@ -106,6 +108,7 @@ public class EVCacheClient { private List tags; private final Map counterMap = new ConcurrentHashMap(); private final Id loopCpuUtilizationId; + private final Timer loopEnqueueToWriteLatency; private final Property hashingAlgo; protected final Counter operationsCounter; private final boolean isDuetClient; @@ -155,6 +158,9 @@ public class EVCacheClient { PolledMeter.using(registry) .withId(loopCpuUtilizationId) .monitorValue(this.evcacheMemcachedClient.getLoopProbe(), EVCacheLoopProbe::sampleUtilization); + this.loopEnqueueToWriteLatency = EVCacheMetricsFactory.getInstance() + .getPercentileTimer(EVCacheMetricsFactory.INTERNAL_LOOP_ENQUEUE_TO_WRITE_LATENCY, + this.tags, Duration.ofMillis(100)); this.evcacheMemcachedClient.addObserver(connectionObserver); this.decodingTranscoder = new EVCacheSerializingTranscoder(Integer.MAX_VALUE); @@ -1695,6 +1701,27 @@ public Counter getOperationCounter() { return operationsCounter; } + /** + * Record per-operation in-process latency from when an EVCache future attached + * a spymemcached {@link net.spy.memcached.ops.Operation} (immediately before + * enqueue into the memcached connection) to when the loop thread finished + * writing the operation to the socket. + * + *

This is the lagging knee-detector that complements the phase 1 loop CPU + * utilization gauge. It is no-throw on purpose so a bad measurement cannot + * break an operation completion callback. + */ + public void recordLoopEnqueueToWriteLatency(net.spy.memcached.ops.Operation op, long operationAttachedNs) { + if (op == null || operationAttachedNs <= 0L) return; + try { + final long wc = op.getWriteCompleteTimestamp(); + if (wc <= 0L || wc < operationAttachedNs) return; + loopEnqueueToWriteLatency.record(wc - operationAttachedNs, TimeUnit.NANOSECONDS); + } catch (Throwable t) { + if (log.isDebugEnabled()) log.debug("recordLoopEnqueueToWriteLatency failed", t); + } + } + /** * Return the keys upto the limit. The key will be cannoicalized key( or hashed Key).
diff --git a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java index 2d196dea..b8cfe97c 100644 --- a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java +++ b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java @@ -389,7 +389,7 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection plainKeys, int initialLatchCount = chunks.isEmpty() ? 0 : 1; final CountDownLatch latch = new CountDownLatch(initialLatchCount); final Collection ops = new ArrayList(chunks.size()); - final EVCacheBulkGetFuture rv = new EVCacheBulkGetFuture(m, ops, latch, executorService, client); + final EVCacheBulkGetFuture rv = new EVCacheBulkGetFuture(m, ops, latch, executorService, client, System.nanoTime()); rv.setExpectedCount(chunks.size()); final DistributionSummary dataSizeDS = getDataSizeDistributionSummary( diff --git a/evcache-core/src/test/java/com/netflix/evcache/operation/EVCacheBulkGetFutureLatencyTest.java b/evcache-core/src/test/java/com/netflix/evcache/operation/EVCacheBulkGetFutureLatencyTest.java new file mode 100644 index 00000000..01605637 --- /dev/null +++ b/evcache-core/src/test/java/com/netflix/evcache/operation/EVCacheBulkGetFutureLatencyTest.java @@ -0,0 +1,41 @@ +package com.netflix.evcache.operation; + +import com.netflix.evcache.pool.EVCacheClient; +import net.spy.memcached.ops.GetOperation; +import net.spy.memcached.ops.Operation; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class EVCacheBulkGetFutureLatencyTest { + + @Test + public void recordsLatencyAgainstSharedBulkTimestampForEachChunk() { + EVCacheClient client = mock(EVCacheClient.class); + long bulkAttachedNs = System.nanoTime(); + + Map> rvMap = new HashMap>(); + Collection ops = new ArrayList(); + + EVCacheBulkGetFuture future = new EVCacheBulkGetFuture( + rvMap, ops, new CountDownLatch(1), null, client, bulkAttachedNs); + future.setExpectedCount(2); + + GetOperation chunkOne = mock(GetOperation.class); + GetOperation chunkTwo = mock(GetOperation.class); + + future.signalSingleOpComplete(0, chunkOne); + future.signalSingleOpComplete(1, chunkTwo); + + verify(client).recordLoopEnqueueToWriteLatency(chunkOne, bulkAttachedNs); + verify(client).recordLoopEnqueueToWriteLatency(chunkTwo, bulkAttachedNs); + } +} diff --git a/evcache-core/src/test/java/com/netflix/evcache/operation/EVCacheOperationFutureLatencyTest.java b/evcache-core/src/test/java/com/netflix/evcache/operation/EVCacheOperationFutureLatencyTest.java new file mode 100644 index 00000000..9205f84f --- /dev/null +++ b/evcache-core/src/test/java/com/netflix/evcache/operation/EVCacheOperationFutureLatencyTest.java @@ -0,0 +1,57 @@ +package com.netflix.evcache.operation; + +import com.netflix.evcache.pool.EVCacheClient; +import net.spy.memcached.ops.Operation; +import org.mockito.ArgumentCaptor; +import org.testng.annotations.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertTrue; + +public class EVCacheOperationFutureLatencyTest { + + @Test + public void recordsLatencyOnSignalCompleteForSingleOp() { + EVCacheClient client = mock(EVCacheClient.class); + Operation op = mock(Operation.class); + // pretend the write completed shortly after setOperation() + when(op.getWriteCompleteTimestamp()).thenReturn(System.nanoTime() + 1_000_000L); + + EVCacheOperationFuture future = new EVCacheOperationFuture( + "key", new CountDownLatch(1), new AtomicReference(null), 1000L, null, client); + + future.setOperation(op); + future.signalComplete(); + + ArgumentCaptor startCaptor = ArgumentCaptor.forClass(Long.class); + verify(client).recordLoopEnqueueToWriteLatency(org.mockito.Matchers.eq(op), startCaptor.capture()); + assertTrue(startCaptor.getValue() > 0L, + "expected operationAttachedNs to be captured > 0, got " + startCaptor.getValue()); + } + + @Test + public void recordsLatencyWithLatestAttachedTimestampOnRetry() throws Exception { + EVCacheClient client = mock(EVCacheClient.class); + Operation firstOp = mock(Operation.class); + Operation retryOp = mock(Operation.class); + + EVCacheOperationFuture future = new EVCacheOperationFuture( + "key", new CountDownLatch(1), new AtomicReference(null), 1000L, null, client); + + future.setOperation(firstOp); + long firstStart = System.nanoTime(); + Thread.sleep(2); + future.setOperation(retryOp); + future.signalComplete(); + + ArgumentCaptor startCaptor = ArgumentCaptor.forClass(Long.class); + verify(client).recordLoopEnqueueToWriteLatency(org.mockito.Matchers.eq(retryOp), startCaptor.capture()); + assertTrue(startCaptor.getValue() >= firstStart, + "expected retry timestamp to be >= first attach time"); + } +} diff --git a/evcache-core/src/test/java/test-suite.xml b/evcache-core/src/test/java/test-suite.xml index 7d7568a6..2112cbff 100644 --- a/evcache-core/src/test/java/test-suite.xml +++ b/evcache-core/src/test/java/test-suite.xml @@ -4,6 +4,8 @@ + + From a6d34e2295357b0ed65ae4dd3a55e18e30bd5e51 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Mon, 18 May 2026 19:06:32 +0000 Subject: [PATCH 2/2] fix: flakey test and improve comments --- .../netflix/evcache/test/EVCacheTestDI.java | 13 +++++++++--- .../netflix/evcache/pool/EVCacheClient.java | 20 +++++++++---------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java b/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java index d95875dc..206ce838 100644 --- a/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java +++ b/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java @@ -118,10 +118,17 @@ public void testLoopEnqueueToWriteLatencyMetricRecords() throws Exception { final Map> clientsByServerGroup = manager.getEVCacheClientPool(appName).getAllInstancesByServerGroup(); assertFalse(clientsByServerGroup.isEmpty(), "expected EVCache clients for " + appName); + // recordLoopEnqueueToWriteLatency is invoked from EVCacheOperationFuture.signalComplete, + // which runs on the spymemcached IO loop *after* OperationFuture's latch is decremented. + // That means evCache.get() can return before the metric has been recorded. Issue gets + // and poll the timer until one sample shows up, with a generous total budget so slow + // CI hosts can't lose the race. + final long deadlineNs = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); boolean recorded = false; - for (int attempt = 0; attempt < 10 && !recorded; attempt++) { - get(attempt, evCache); - Thread.sleep(50); + int attempt = 0; + while (!recorded && System.nanoTime() < deadlineNs) { + get(attempt++, evCache); + Thread.sleep(100); for (List clients : clientsByServerGroup.values()) { for (EVCacheClient client : clients) { final Id id = EVCacheMetricsFactory.getInstance().getId(EVCacheMetricsFactory.INTERNAL_LOOP_ENQUEUE_TO_WRITE_LATENCY, client.getTagList()); diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index c2b08083..0c97def9 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -1705,21 +1705,19 @@ public Counter getOperationCounter() { * Record per-operation in-process latency from when an EVCache future attached * a spymemcached {@link net.spy.memcached.ops.Operation} (immediately before * enqueue into the memcached connection) to when the loop thread finished - * writing the operation to the socket. + * writing the operation to the socket (may be queued at socket). * - *

This is the lagging knee-detector that complements the phase 1 loop CPU - * utilization gauge. It is no-throw on purpose so a bad measurement cannot - * break an operation completion callback. + * Use this to identify if the evcache IO thread is getting busy enough that it + * is impacting transaction latency. This does not completely capture the socket + * to network packet time as there may still be queueing on the socket and NIC. */ public void recordLoopEnqueueToWriteLatency(net.spy.memcached.ops.Operation op, long operationAttachedNs) { if (op == null || operationAttachedNs <= 0L) return; - try { - final long wc = op.getWriteCompleteTimestamp(); - if (wc <= 0L || wc < operationAttachedNs) return; - loopEnqueueToWriteLatency.record(wc - operationAttachedNs, TimeUnit.NANOSECONDS); - } catch (Throwable t) { - if (log.isDebugEnabled()) log.debug("recordLoopEnqueueToWriteLatency failed", t); - } + + final long writeComplete = op.getWriteCompleteTimestamp(); + if (writeComplete <= 0L || writeComplete < operationAttachedNs) return; + + loopEnqueueToWriteLatency.record(writeComplete - operationAttachedNs, TimeUnit.NANOSECONDS); }