Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion evcache-core/src/main/java/com/netflix/evcache/EVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@
*
* @author smadappa
*/
public interface EVCache {
public interface EVCache extends EVCacheMetaOperations {
// TODO: Remove Async methods (Project rx) and rename COMPLETABLE_* with ASYNC_*
public static enum Call {
GET, GETL, GET_AND_TOUCH, ASYNC_GET, BULK, SET, DELETE, INCR, DECR, TOUCH, APPEND, PREPEND, REPLACE, ADD, APPEND_OR_ADD, GET_ALL, META_GET, META_SET, META_DEBUG,
META_GET_BULK, META_DELETE,
COMPLETABLE_FUTURE_GET, COMPLETABLE_FUTURE_GET_BULK
};

Expand Down
374 changes: 374 additions & 0 deletions evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.netflix.evcache;

import java.util.Collection;
import java.util.Map;

import com.netflix.evcache.EVCacheLatch.Policy;
import com.netflix.evcache.operation.EVCacheItem;
import net.spy.memcached.protocol.ascii.MetaGetBulkOperation;
import net.spy.memcached.protocol.ascii.MetaSetOperation;
import net.spy.memcached.protocol.ascii.MetaDeleteOperation;
import net.spy.memcached.transcoders.Transcoder;

/**
* Additional meta protocol operations for EVCache.
* These methods leverage the advanced capabilities of memcached's meta protocol.
*/
public interface EVCacheMetaOperations {

/**
* Advanced set operation using meta protocol with CAS, conditional operations,
* and atomic features across all replicas.
*
* @param builder Meta set configuration builder
* @param policy Latch policy for coordinating across replicas
* @return EVCacheLatch for tracking operation completion
* @throws EVCacheException if operation fails
*/
default EVCacheLatch metaSet(MetaSetOperation.Builder builder, Policy policy) throws EVCacheException {
throw new EVCacheException("Default implementation. If you are implementing EVCache interface you need to implement this method.");
}

/**
* Retrieve values and metadata for multiple keys using meta protocol.
* Following EVCache bulk operation conventions.
*
* @param keys Collection of keys to retrieve
* @param tc Transcoder for deserialization
* @return Map of key to EVCacheItem containing data and metadata
* @throws EVCacheException if operation fails
*/
default <T> Map<String, EVCacheItem<T>> metaGetBulk(Collection<String> keys, Transcoder<T> tc) throws EVCacheException {
throw new EVCacheException("Default implementation. If you are implementing EVCache interface you need to implement this method.");
}

/**
* Retrieve values and metadata for multiple keys using meta protocol with custom configuration.
*
* @param keys Collection of keys to retrieve
* @param config Configuration for meta get bulk behavior
* @param tc Transcoder for deserialization
* @return Map of key to EVCacheItem containing data and metadata
* @throws EVCacheException if operation fails
*/
default <T> Map<String, EVCacheItem<T>> metaGetBulk(Collection<String> keys, MetaGetBulkOperation.Config config, Transcoder<T> tc) throws EVCacheException {
throw new EVCacheException("Default implementation. If you are implementing EVCache interface you need to implement this method.");
}

/**
* Retrieve values and metadata for multiple keys using meta protocol.
* Varargs convenience method.
*
* @param keys Keys to retrieve
* @return Map of key to EVCacheItem containing data and metadata
* @throws EVCacheException if operation fails
*/
default <T> Map<String, EVCacheItem<T>> metaGetBulk(String... keys) throws EVCacheException {
throw new EVCacheException("Default implementation. If you are implementing EVCache interface you need to implement this method.");
}

/**
* Retrieve values and metadata for multiple keys using meta protocol with custom transcoder.
* Varargs convenience method.
*
* @param tc Transcoder for deserialization
* @param keys Keys to retrieve
* @return Map of key to EVCacheItem containing data and metadata
* @throws EVCacheException if operation fails
*/
default <T> Map<String, EVCacheItem<T>> metaGetBulk(Transcoder<T> tc, String... keys) throws EVCacheException {
throw new EVCacheException("Default implementation. If you are implementing EVCache interface you need to implement this method.");
}

/**
* Advanced delete operation using meta protocol with CAS and conditional operations.
* Supports both deletion and invalidation (marking as stale).
*
* @param builder Meta delete configuration builder
* @param policy Latch policy for coordinating across replicas
* @return EVCacheLatch for tracking operation completion
* @throws EVCacheException if operation fails
*/
default EVCacheLatch metaDelete(MetaDeleteOperation.Builder builder, Policy policy) throws EVCacheException {
throw new EVCacheException("Default implementation. If you are implementing EVCache interface you need to implement this method.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
import net.spy.memcached.protocol.ascii.MetaGetOperation;
import net.spy.memcached.protocol.ascii.MetaGetOperationImpl;
import net.spy.memcached.protocol.ascii.MetaArithmeticOperationImpl;
import net.spy.memcached.protocol.ascii.MetaSetOperation;
import net.spy.memcached.protocol.ascii.MetaSetOperationImpl;
import net.spy.memcached.protocol.ascii.MetaGetBulkOperation;
import net.spy.memcached.protocol.ascii.MetaGetBulkOperationImpl;
import net.spy.memcached.protocol.ascii.MetaDeleteOperation;
import net.spy.memcached.protocol.ascii.MetaDeleteOperationImpl;
import net.spy.memcached.ops.Mutator;
import net.spy.memcached.ops.MutatorOperation;
import net.spy.memcached.ops.OperationCallback;
Expand All @@ -24,6 +30,19 @@ public MetaGetOperation metaGet(String key, MetaGetOperation.Callback cb) {
return new MetaGetOperationImpl(key, cb);
}

public MetaSetOperation metaSet(MetaSetOperation.Builder builder, MetaSetOperation.Callback cb) {
return new MetaSetOperationImpl(builder, cb);
}

public MetaGetBulkOperation metaGetBulk(MetaGetBulkOperation.Config config, MetaGetBulkOperation.Callback cb) {
return new MetaGetBulkOperationImpl(config, cb);
}

public MetaDeleteOperation metaDelete(MetaDeleteOperation.Builder builder, MetaDeleteOperation.Callback cb) {
return new MetaDeleteOperationImpl(builder, cb);
}


public ExecCmdOperation execCmd(String cmd, ExecCmdOperation.Callback cb) {
return new ExecCmdOperationImpl(cmd, cb);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,22 @@ public int getSizeInBytes() {
return sizeInBytes;
}

/**
* Copy all metadata from another EVCacheItemMetaData instance.
*
* @param source the source metadata to copy from
*/
public void copyFrom(EVCacheItemMetaData source) {
if (source != null) {
this.secondsLeftToExpire = source.secondsLeftToExpire;
this.secondsSinceLastAccess = source.secondsSinceLastAccess;
this.cas = source.cas;
this.hasBeenFetchedAfterWrite = source.hasBeenFetchedAfterWrite;
this.slabClass = source.slabClass;
this.sizeInBytes = source.sizeInBytes;
}
}

@Override
public int hashCode() {
final int prime = 31;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,31 @@ public EVCacheLatchImpl(Policy policy, int _count, String appName) {
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
if (log.isDebugEnabled()) log.debug("Current Latch Count = " + latch.getCount() + "; await for "+ timeout + " " + unit.name() + " appName : " + appName);
final long start = log.isDebugEnabled() ? System.currentTimeMillis() : 0;
final boolean awaitSuccess = latch.await(timeout, unit);
if (log.isDebugEnabled()) log.debug("await success = " + awaitSuccess + " after " + (System.currentTimeMillis() - start) + " msec." + " appName : " + appName + ((evcacheEvent != null) ? " keys : " + evcacheEvent.getEVCacheKeys() : ""));
return awaitSuccess;
final boolean countdownFinished = latch.await(timeout, unit);
if (log.isDebugEnabled()) log.debug("countdown finished = " + countdownFinished + " after " + (System.currentTimeMillis() - start) + " msec." + " appName : " + appName + ((evcacheEvent != null) ? " keys : " + evcacheEvent.getEVCacheKeys() : ""));

// Check if enough operations succeeded (not just completed)
if (!countdownFinished) {
return false; // Timed out
}

// Count how many operations succeeded
int successCount = 0;
for (Future<Boolean> future : futures) {
try {
if (future.isDone() && future.get().equals(Boolean.TRUE)) {
successCount++;
}
} catch (Exception e) {
// Exception means failure
if (log.isDebugEnabled()) log.debug("Future failed with exception", e);
}
}

// Return true only if enough operations succeeded according to policy
final boolean policyMet = successCount >= expectedCompleteCount;
if (log.isDebugEnabled()) log.debug("Policy check: successCount=" + successCount + ", required=" + expectedCompleteCount + ", policyMet=" + policyMet);
return policyMet;
}

/*
Expand Down Expand Up @@ -201,11 +223,15 @@ public void setEVCacheEvent(EVCacheEvent e) {
@Override
public void onComplete(OperationFuture<?> future) throws Exception {
if (log.isDebugEnabled()) log.debug("BEGIN : onComplete - Calling Countdown. Completed Future = " + future + "; App : " + appName);

countDown();
completeCount++;

if(evcacheEvent != null) {
if (log.isDebugEnabled()) log.debug(";App : " + evcacheEvent.getAppName() + "; Call : " + evcacheEvent.getCall() + "; Keys : " + evcacheEvent.getEVCacheKeys() + "; completeCount : " + completeCount + "; totalFutureCount : " + totalFutureCount +"; failureCount : " + failureCount);
try {
Object result = future.isDone() ? future.get() : null;

if(future.isDone() && future.get().equals(Boolean.FALSE)) {
failureCount++;
if(failReason == null) failReason = EVCacheMetricsFactory.getInstance().getStatusCode(future.getStatus().getStatusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.netflix.evcache.operation.EVCacheItem;
import com.netflix.evcache.operation.EVCacheItemMetaData;
import com.netflix.evcache.operation.EVCacheLatchImpl;
import com.netflix.evcache.operation.EVCacheOperationFuture;
import com.netflix.evcache.pool.observer.EVCacheConnectionObserver;
import com.netflix.evcache.util.EVCacheConfig;
import com.netflix.evcache.util.KeyHasher;
Expand Down Expand Up @@ -1761,6 +1762,37 @@ public <T> EVCacheItem<T> metaGet(String key, Transcoder<T> tc, boolean _throwEx
}


public EVCacheOperationFuture<Boolean> metaDelete(net.spy.memcached.protocol.ascii.MetaDeleteOperation.Builder builder, EVCacheLatchImpl latch) throws Exception {
final String key = builder.getKey();
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!ensureWriteQueueSize(node, key, Call.DELETE)) {
if (log.isInfoEnabled()) log.info("Node : " + node + " is not active. Failing fast and dropping the meta delete event.");
final net.spy.memcached.internal.ListenableFuture<Boolean, net.spy.memcached.internal.OperationCompletionListener> defaultFuture = (net.spy.memcached.internal.ListenableFuture<Boolean, net.spy.memcached.internal.OperationCompletionListener>) getDefaultFuture();
if (latch != null && !isInWriteOnly()) latch.addFuture(defaultFuture);
return (EVCacheOperationFuture<Boolean>) defaultFuture;
}

return evcacheMemcachedClient.metaDelete(builder, latch);
}

public EVCacheOperationFuture<Boolean> metaSet(net.spy.memcached.protocol.ascii.MetaSetOperation.Builder builder, EVCacheLatchImpl latch) throws Exception {
final String key = builder.getKey();
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);

if (!ensureWriteQueueSize(node, key, Call.SET)) {
if (log.isInfoEnabled()) log.info("Node : " + node + " is not active. Failing fast and dropping the meta set event.");
final net.spy.memcached.internal.ListenableFuture<Boolean, net.spy.memcached.internal.OperationCompletionListener> defaultFuture = (net.spy.memcached.internal.ListenableFuture<Boolean, net.spy.memcached.internal.OperationCompletionListener>) getDefaultFuture();
if (latch != null && !isInWriteOnly()) latch.addFuture(defaultFuture);
return (EVCacheOperationFuture<Boolean>) defaultFuture;
}

return evcacheMemcachedClient.metaSet(builder, latch);
}

public EVCacheOperationFuture<Map<String, EVCacheItem<Object>>> metaGetBulk(net.spy.memcached.protocol.ascii.MetaGetBulkOperation.Config config) throws Exception {
return evcacheMemcachedClient.metaGetBulk(config);
}

public void addTag(String tagName, String tagValue) {
final Tag tag = new BasicTag(tagName, tagValue);
if(tags.contains(tag)) return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ public String getKeyForNode(MemcachedNode node, int repetition) {
public String toString() {
return "EVCacheKetamaNodeLocatorConfiguration [EVCacheClient=" + client + ", BucketSize=" + getNodeRepetitions() + "]";
}
}
}
Loading