Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.netflix.spectator.api.Gauge;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import com.netflix.spectator.api.patterns.PolledMeter;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -111,6 +112,38 @@ public void testLoopCpuUtilizationMetricRegistered() throws Exception {
assertTrue(nonZero, "expected loop CPU utilization meter to report a non-zero value");
}

@Test(dependsOnMethods = { "testLoopCpuUtilizationMetricRegistered" })
public void testLoopEnqueueToWriteLatencyMetricRecords() throws Exception {
final Registry registry = EVCacheMetricsFactory.getInstance().getRegistry();
final Map<ServerGroup, List<EVCacheClient>> clientsByServerGroup = manager.getEVCacheClientPool(appName).getAllInstancesByServerGroup();
assertFalse(clientsByServerGroup.isEmpty(), "expected EVCache clients for " + appName);

// recordLoopEnqueueToWriteLatency is invoked from EVCacheOperationFuture.signalComplete,
// which runs on the spymemcached IO loop *after* OperationFuture's latch is decremented.
// That means evCache.get() can return before the metric has been recorded. Issue gets
// and poll the timer until one sample shows up, with a generous total budget so slow
// CI hosts can't lose the race.
final long deadlineNs = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
boolean recorded = false;
int attempt = 0;
while (!recorded && System.nanoTime() < deadlineNs) {
get(attempt++, evCache);
Thread.sleep(100);
for (List<EVCacheClient> clients : clientsByServerGroup.values()) {
for (EVCacheClient client : clients) {
final Id id = EVCacheMetricsFactory.getInstance().getId(EVCacheMetricsFactory.INTERNAL_LOOP_ENQUEUE_TO_WRITE_LATENCY, client.getTagList());
final Timer timer = registry.timer(id);
if (timer.count() > 0L) {
recorded = true;
break;
}
}
if (recorded) break;
}
}
assertTrue(recorded, "expected enqueueToWriteLatency timer to record at least one sample");
}

@Test(dependsOnMethods = { "testEVCache" })
public void testKeySizeCheck() throws Exception {
final String key = "This is an invalid key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ public String getStatusCode(StatusCode sc) {
public static final String INTERNAL_EXECUTOR_SCHEDULED = "internal.evc.client.scheduledExecutor";
public static final String INTERNAL_POOL_INIT_ERROR = "internal.evc.client.init.error";
public static final String INTERNAL_LOOP_CPU_UTILIZATION = "internal.evc.client.loop.cpuUtilization";
public static final String INTERNAL_LOOP_ENQUEUE_TO_WRITE_LATENCY = "internal.evc.client.loop.enqueueToWriteLatency";

public static final String INTERNAL_NUM_CHUNK_SIZE = "internal.evc.client.chunking.numOfChunks";
public static final String INTERNAL_CHUNK_DATA_SIZE = "internal.evc.client.chunking.dataSize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,21 @@ public class EVCacheBulkGetFuture<T> extends BulkGetFuture<T> {
private final Collection<Operation> ops;
private final CountDownLatch latch;
private final long start;
private final long operationAttachedNs;
private final EVCacheClient client;
private AtomicReferenceArray<SingleOperationState> operationStates;

public EVCacheBulkGetFuture(Map<String, Future<T>> m, Collection<Operation> getOps, CountDownLatch l, ExecutorService service, EVCacheClient client) {
this(m, getOps, l, service, client, System.nanoTime());
}

public EVCacheBulkGetFuture(Map<String, Future<T>> m, Collection<Operation> getOps, CountDownLatch l, ExecutorService service, EVCacheClient client, long operationAttachedNs) {
super(m, getOps, l, service);
rvMap = m;
ops = getOps;
latch = l;
this.start = System.currentTimeMillis();
this.operationAttachedNs = operationAttachedNs;
this.client = client;
this.operationStates = null;
}
Expand Down Expand Up @@ -345,6 +351,7 @@ public void signalComplete() {

public void signalSingleOpComplete(int sequenceNo, GetOperation op) {
this.operationStates.set(sequenceNo, new SingleOperationState(op));
client.recordLoopEnqueueToWriteLatency(op, operationAttachedNs);
}

public boolean cancel(boolean ign) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private static final class LazySharedExecutor {
private final CountDownLatch latch;
private final AtomicReference<T> objRef;
private Operation op;
private long operationAttachedNs;
private final String key;
private final long start;
private final EVCacheClient client;
Expand All @@ -93,6 +94,7 @@ public Operation getOperation() {

public void setOperation(Operation to) {
this.op = to;
this.operationAttachedNs = System.nanoTime();
super.setOperation(to);
}

Expand Down Expand Up @@ -380,7 +382,11 @@ public void call() {
}

public void signalComplete() {
super.signalComplete();
try {
client.recordLoopEnqueueToWriteLatency(op, operationAttachedNs);
} finally {
super.signalComplete();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.api.Timer;
import com.netflix.spectator.api.patterns.PolledMeter;
import java.io.BufferedInputStream;
import java.io.IOException;
Expand All @@ -32,6 +33,7 @@
import java.net.SocketAddress;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -106,6 +108,7 @@ public class EVCacheClient {
private List<Tag> tags;
private final Map<String, Counter> counterMap = new ConcurrentHashMap<String, Counter>();
private final Id loopCpuUtilizationId;
private final Timer loopEnqueueToWriteLatency;
private final Property<String> hashingAlgo;
protected final Counter operationsCounter;
private final boolean isDuetClient;
Expand Down Expand Up @@ -155,6 +158,9 @@ public class EVCacheClient {
PolledMeter.using(registry)
.withId(loopCpuUtilizationId)
.monitorValue(this.evcacheMemcachedClient.getLoopProbe(), EVCacheLoopProbe::sampleUtilization);
this.loopEnqueueToWriteLatency = EVCacheMetricsFactory.getInstance()
.getPercentileTimer(EVCacheMetricsFactory.INTERNAL_LOOP_ENQUEUE_TO_WRITE_LATENCY,
this.tags, Duration.ofMillis(100));
this.evcacheMemcachedClient.addObserver(connectionObserver);

this.decodingTranscoder = new EVCacheSerializingTranscoder(Integer.MAX_VALUE);
Expand Down Expand Up @@ -1695,6 +1701,25 @@ public Counter getOperationCounter() {
return operationsCounter;
}

/**
* Record per-operation in-process latency from when an EVCache future attached
* a spymemcached {@link net.spy.memcached.ops.Operation} (immediately before
* enqueue into the memcached connection) to when the loop thread finished
* writing the operation to the socket (may be queued at socket).
*
* Use this to identify if the evcache IO thread is getting busy enough that it
* is impacting transaction latency. This does not completely capture the socket
* to network packet time as there may still be queueing on the socket and NIC.
*/
public void recordLoopEnqueueToWriteLatency(net.spy.memcached.ops.Operation op, long operationAttachedNs) {
if (op == null || operationAttachedNs <= 0L) return;

final long writeComplete = op.getWriteCompleteTimestamp();
if (writeComplete <= 0L || writeComplete < operationAttachedNs) return;

loopEnqueueToWriteLatency.record(writeComplete - operationAttachedNs, TimeUnit.NANOSECONDS);
}


/**
* Return the keys upto the limit. The key will be cannoicalized key( or hashed Key).<br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> plainKeys,
int initialLatchCount = chunks.isEmpty() ? 0 : 1;
final CountDownLatch latch = new CountDownLatch(initialLatchCount);
final Collection<Operation> ops = new ArrayList<Operation>(chunks.size());
final EVCacheBulkGetFuture<T> rv = new EVCacheBulkGetFuture<T>(m, ops, latch, executorService, client);
final EVCacheBulkGetFuture<T> rv = new EVCacheBulkGetFuture<T>(m, ops, latch, executorService, client, System.nanoTime());
rv.setExpectedCount(chunks.size());

final DistributionSummary dataSizeDS = getDataSizeDistributionSummary(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.netflix.evcache.operation;

import com.netflix.evcache.pool.EVCacheClient;
import net.spy.memcached.ops.GetOperation;
import net.spy.memcached.ops.Operation;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class EVCacheBulkGetFutureLatencyTest {

@Test
public void recordsLatencyAgainstSharedBulkTimestampForEachChunk() {
EVCacheClient client = mock(EVCacheClient.class);
long bulkAttachedNs = System.nanoTime();

Map<String, Future<Object>> rvMap = new HashMap<String, Future<Object>>();
Collection<Operation> ops = new ArrayList<Operation>();

EVCacheBulkGetFuture<Object> future = new EVCacheBulkGetFuture<Object>(
rvMap, ops, new CountDownLatch(1), null, client, bulkAttachedNs);
future.setExpectedCount(2);

GetOperation chunkOne = mock(GetOperation.class);
GetOperation chunkTwo = mock(GetOperation.class);

future.signalSingleOpComplete(0, chunkOne);
future.signalSingleOpComplete(1, chunkTwo);

verify(client).recordLoopEnqueueToWriteLatency(chunkOne, bulkAttachedNs);
verify(client).recordLoopEnqueueToWriteLatency(chunkTwo, bulkAttachedNs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.netflix.evcache.operation;

import com.netflix.evcache.pool.EVCacheClient;
import net.spy.memcached.ops.Operation;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;

public class EVCacheOperationFutureLatencyTest {

@Test
public void recordsLatencyOnSignalCompleteForSingleOp() {
EVCacheClient client = mock(EVCacheClient.class);
Operation op = mock(Operation.class);
// pretend the write completed shortly after setOperation()
when(op.getWriteCompleteTimestamp()).thenReturn(System.nanoTime() + 1_000_000L);

EVCacheOperationFuture<Boolean> future = new EVCacheOperationFuture<Boolean>(
"key", new CountDownLatch(1), new AtomicReference<Boolean>(null), 1000L, null, client);

future.setOperation(op);
future.signalComplete();

ArgumentCaptor<Long> startCaptor = ArgumentCaptor.forClass(Long.class);
verify(client).recordLoopEnqueueToWriteLatency(org.mockito.Matchers.eq(op), startCaptor.capture());
assertTrue(startCaptor.getValue() > 0L,
"expected operationAttachedNs to be captured > 0, got " + startCaptor.getValue());
}

@Test
public void recordsLatencyWithLatestAttachedTimestampOnRetry() throws Exception {
EVCacheClient client = mock(EVCacheClient.class);
Operation firstOp = mock(Operation.class);
Operation retryOp = mock(Operation.class);

EVCacheOperationFuture<Boolean> future = new EVCacheOperationFuture<Boolean>(
"key", new CountDownLatch(1), new AtomicReference<Boolean>(null), 1000L, null, client);

future.setOperation(firstOp);
long firstStart = System.nanoTime();
Thread.sleep(2);
future.setOperation(retryOp);
future.signalComplete();

ArgumentCaptor<Long> startCaptor = ArgumentCaptor.forClass(Long.class);
verify(client).recordLoopEnqueueToWriteLatency(org.mockito.Matchers.eq(retryOp), startCaptor.capture());
assertTrue(startCaptor.getValue() >= firstStart,
"expected retry timestamp to be >= first attach time");
}
}
2 changes: 2 additions & 0 deletions evcache-core/src/test/java/test-suite.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
<classes>
<class name="com.netflix.evcache.pool.NodeLocatorLookupTest" />
<class name="com.netflix.evcache.pool.EVCacheLoopProbeTest" />
<class name="com.netflix.evcache.operation.EVCacheOperationFutureLatencyTest" />
<class name="com.netflix.evcache.operation.EVCacheBulkGetFutureLatencyTest" />
</classes>
</test>
<test name="MockTests">
Expand Down