Source code for memcache

"""Utilities for caching data in memcache.

TODO: move most or all of this to webutil?
"""
from datetime import datetime, timedelta, timezone
from enum import auto, Enum
import functools
import logging
import os
import re
import string

import config
from google.cloud.ndb._cache import global_cache_key
from google.cloud.ndb.global_cache import _InProcessGlobalCache, MemcacheCache
from google.cloud.ndb.key import Key
from pymemcache.client.base import PooledClient
from pymemcache.serde import PickleSerde
from pymemcache.test.utils import MockMemcacheClient
from webutil import appengine_info, util

from domains import PRIMARY_DOMAIN

logger = logging.getLogger(__name__)

# https://github.com/memcached/memcached/wiki/Commands#standard-protocol
KEY_MAX_LEN = 250

MEMOIZE_VERSION = 2

# per-user rates for running tasks. rate limits and spreads out tasks for bursty
# users. values map protocol label to delay. None means all protocols.
# https://github.com/snarfed/bridgy-fed/issues/1788
PER_USER_TASK_RATES = {
    'receive': {
        None: timedelta(seconds=5),  # all protocols
    },
    'send': {
        'atproto': timedelta(seconds=10),
    },
}

class RateLimitType(Enum):
    LINEAR = auto()
    EXPONENTIAL = auto()

NDB_MEMCACHE_TIMEOUT = timedelta(hours=2)

WHITESPACE_RE = re.compile(f'[{string.whitespace}]')

# https://pymemcache.readthedocs.io/en/latest/apidoc/pymemcache.client.base.html#pymemcache.client.base.Client.__init__
kwargs = {
    'server': os.environ.get('MEMCACHE_HOST', 'localhost'),
    'allow_unicode_keys': True,
    'default_noreply': False,
    'timeout': 10,   # seconds
    'connect_timeout': 10,   # seconds
}

if appengine_info.DEBUG or appengine_info.LOCAL_SERVER:
    logger.info(f'Using in memory mock memcache: {kwargs}')
    memcache = PooledClient(max_pool_size=1, **kwargs)
    pickle_memcache = PooledClient(max_pool_size=1, serde=PickleSerde(), **kwargs)
    memcache.client_class = pickle_memcache.client_class = MockMemcacheClient
    global_cache = _InProcessGlobalCache()
else:
    logger.info(f'Using production Memorystore memcache: {kwargs}')
    memcache = PooledClient(**kwargs)
    pickle_memcache = PooledClient(serde=PickleSerde(), **kwargs)
    global_cache = MemcacheCache(memcache, strict_read=False, strict_write=False)


[docs] def cache_policy(key): """In memory ndb cache. https://github.com/snarfed/bridgy-fed/issues/1149#issuecomment-2261383697 Only cache kinds in memory that are immutable or largely harmless when changed. Keep an eye on this in case we start seeing problems due to this ndb bug where unstored in-memory modifications get returned by later gets: https://github.com/googleapis/python-ndb/issues/888 Args: key (google.cloud.datastore.key.Key or google.cloud.ndb.key.Key): see https://github.com/googleapis/python-ndb/issues/987 Returns: bool: whether to cache this object """ if isinstance(key, Key): # use internal google.cloud.datastore.key.Key # https://github.com/googleapis/python-ndb/issues/987 key = key._key return key and key.kind in ('AtpBlock', 'Object')
def global_cache_policy(key): return True
[docs] def global_cache_timeout_policy(key): """Cache everything for 2h. Args: key (google.cloud.datastore.key.Key or google.cloud.ndb.key.Key): see https://github.com/googleapis/python-ndb/issues/987 Returns: int: cache expiration for this object, in seconds """ if isinstance(key, Key): # use internal google.cloud.datastore.key.Key # https://github.com/googleapis/python-ndb/issues/987 key = key._key if key.kind == 'AtpBlock': return None return int(NDB_MEMCACHE_TIMEOUT.total_seconds())
[docs] def key(key): """Preprocesses a memcache key. Right now just truncates it to 250 chars. https://pymemcache.readthedocs.io/en/latest/apidoc/pymemcache.client.base.html https://github.com/memcached/memcached/wiki/Commands#standard-protocol TODO: truncate to 250 *UTF-8* chars, to handle Unicode chars in URLs. Related: pymemcache Client's allow_unicode_keys constructor kwarg. Args: key (str) Returns: bytes: """ assert isinstance(key, str), repr(key) return WHITESPACE_RE.sub('_', key).encode()[:KEY_MAX_LEN]
def memoize_key(fn, *args, _version=MEMOIZE_VERSION, **kwargs): return key(f'{fn.__qualname__}-{_version}-{repr(args)}-{repr(kwargs)}') NONE = () # empty tuple
[docs] def memoize(expire=None, key=None, write=True, version=MEMOIZE_VERSION): """Memoize function decorator that stores the cached value in memcache. Args: expire (datetime.timedelta): optional, expiration key (callable): function that takes the function's ``(*args, **kwargs)`` and returns the cache key to use. If it returns None, memcache won't be used. write (bool or callable): whether to write to memcache. If this is a callable, it will be called with the function's ``(*args, **kwargs)`` and should return True or False. version (int): overrides our default version number in the memcache key. Bumping this version can have the same effect as clearing the cache for just the affected function. """ expire = int(expire.total_seconds()) if expire else 0 def decorator(fn): @functools.wraps(fn) def wrapped(*args, **kwargs): cache_key = None if key: key_val = key(*args, **kwargs) if key_val: cache_key = memoize_key(fn, key_val, _version=version) else: cache_key = memoize_key(fn, *args, _version=version, **kwargs) if pickle_memcache and cache_key: val = pickle_memcache.get(cache_key) if val is not None: logger.debug(f'cache hit {cache_key} {repr(val)[:100]}') return None if val == NONE else val else: logger.debug(f'cache miss {cache_key}') val = fn(*args, **kwargs) if pickle_memcache and cache_key: write_cache = (write if isinstance(write, bool) else write(*args, **kwargs)) if write_cache: logger.debug(f'cache set {cache_key} {repr(val)[:100]}') pickle_memcache.set(cache_key, NONE if val is None else val, expire=expire) return val return wrapped return decorator
[docs] def evict(entity_key): """Evict a datastore entity from memcache. For :class:`models.User` and :class:`models.Object` entities, also clears their copies from the :func:`models.get_original_user_key` and :func:`models.get_original_object_key` memoize caches. Args: entity_key (google.cloud.ndb.Key) """ if entity := entity_key.get(): for val in getattr(entity, 'copies', []): entity.clear_get_original_cache(val.uri) global_cache.delete([global_cache_key(entity_key._key)])
[docs] def evict_raw(key): """Evict a key from memcache. Args: key (str) Returns: bool: whether the key existed and was deleted """ return memcache.delete(key)
[docs] def remote_evict(entity_key): """Send a request to production Bridgy Fed to evict an entity from memcache. Args: entity_key (google.cloud.ndb.Key) Returns: requests.Response: """ return util.requests_post(f'https://{PRIMARY_DOMAIN}/admin/memcache/evict', headers={'Authorization': config.SECRET_KEY}, data={'key': entity_key.urlsafe()})
[docs] def task_eta(queue, user_id, protocol=None): """Get the ETA to use for a given user's task in a given queue. Task rate limit delays are per user, stored in memcache with a key based on ``queue`` and ``user_id`` and an integer value of POSIX timestamp (UTC) in seconds. Only generates ETAs for task queues in :attr:`PER_USER_TASK_RATES`. Calls for other queues always return ``None``. Background: https://github.com/snarfed/bridgy-fed/issues/1788 Args: queue (str) user_id (str) protocol (str): optional protocol label to look up protocol-specific delay and :class:`RateLimitType` Returns: datetime.datetime: the ETA for this task, or ``None`` if the ETA is now """ from models import PROTOCOLS if not (delays := PER_USER_TASK_RATES.get(queue)): return None # look up delay for protocol, fall back to None (all protocols) if not (delay := delays.get(protocol) or delays.get(None)): return None cache_key = key(f'task-delay-{queue}-{user_id}') now = util.now() if protocol and PROTOCOLS[protocol].RATE_LIMIT_TYPE == RateLimitType.EXPONENTIAL: if eta_s := memcache.get(cache_key): eta = datetime.fromtimestamp(eta_s, timezone.utc) if eta >= now: cur_delay = eta - now new_eta = eta + max(cur_delay, delay) memcache.set(cache_key, int(new_eta.timestamp())) return new_eta else: # linear if eta_s := memcache.incr(cache_key, int(delay.total_seconds())): eta = datetime.fromtimestamp(eta_s, timezone.utc) if eta > now: return eta # incr failed (key doesn't exist) or timestamp is in the past, set it to now # # note that this isn't synchronized; multiple callers may race and both get now # as the returned ETA. that's ok, we don't depend on this for correctness in any # way, just best-effort rate limiting. memcache.set(cache_key, int(now.timestamp())) return now
########################################### # https://github.com/googleapis/python-ndb/issues/743#issuecomment-2067590945 # # fixes "RuntimeError: Key has already been set in this batch" errors due to # tasklets in pages.serve_feed from logging import error as log_error from sys import modules from google.cloud.ndb._cache import ( _GlobalCacheSetBatch, global_compare_and_swap, global_set_if_not_exists, global_watch, ) from google.cloud.ndb.tasklets import Future, Return, tasklet GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME @tasklet def custom_global_lock_for_read(key: str, value: str): if value is not None: yield global_watch(key, value) lock_acquired = yield global_compare_and_swap( key, LOCKED_FOR_READ, expires=LOCK_TIME ) else: lock_acquired = yield global_set_if_not_exists( key, LOCKED_FOR_READ, expires=LOCK_TIME ) if lock_acquired: raise Return(LOCKED_FOR_READ) modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read