Source code for models

"""Datastore model classes."""
import copy
from datetime import timedelta, timezone
from functools import cached_property, lru_cache
import itertools
import json
import logging
import random
import re
from threading import Lock
from urllib.parse import quote, urlparse
import csv
import io

from arroba.util import parse_at_uri
import cachetools
from Crypto.PublicKey import RSA
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.serialization import (
    Encoding,
    NoEncryption,
    PrivateFormat,
    PublicFormat,
)
from flask import request
from google.cloud import ndb
from google.cloud.ndb.key import _MAX_KEYPART_BYTES
from granary import as1, as2, atom, bluesky, microformats2
from granary.bluesky import BSKY_APP_URL_RE
import granary.farcaster
from granary.generated.farcaster.message_pb2 import (
    Message,
    MESSAGE_TYPE_USER_DATA_ADD,
)
from granary.generated.farcaster.request_response_pb2 import MessagesResponse
import granary.nostr
from granary.source import html_to_text
import humanize
from lexrpc.base import AT_URI_RE
from requests import RequestException
import secp256k1
from webutil import util
from webutil.appengine_info import DEBUG
from webutil.flask_util import error
from webutil.models import (
    EncryptedProperty,
    JsonProperty,
    stored_value,
    StringIdModel,
)
from webutil.util import ellipsize, json_dumps, json_loads

import common
from common import (
    OLD_ACCOUNT_AGE,
    report_error,
)
import domains
from domains import (
    BLOG_REDIRECT_DOMAINS,
    DOMAIN_BLOCKLIST_CANARIES,
    DOMAIN_RE,
    PRIMARY_DOMAIN,
    PROTOCOL_DOMAINS,
    unwrap,
)
import ids
import memcache

# maps string label to Protocol subclass. values are populated by ProtocolUserMeta.
# (we used to wait for ProtocolUserMeta to populate the keys as well, but that was
# awkward to use in datastore model properties with choices, below; it required
# overriding them in reset_model_properties, which was always flaky.)
PROTOCOLS = {label: None for label in (
    'activitypub',
    'ap',
    'atproto',
    'bsky',
    'nostr',
    'ostatus',
    'web',
    'webmention',
    'ui',
)}
DEBUG_PROTOCOLS = (
    'fa',
    'fake',
    'efake',
    'farcaster',
    'fc',
    'other',
)
if DEBUG:
    PROTOCOLS.update({label: None for label in DEBUG_PROTOCOLS})

# maps string kind (eg 'MagicKey') to Protocol subclass.
# populated in ProtocolUserMeta
PROTOCOLS_BY_KIND = {}

# 2048 bits makes tests slow, so use 1024 for them
KEY_BITS = 1024 if DEBUG else 2048
PAGE_SIZE = 20

# auto delete most old objects via the Object.expire property
# https://cloud.google.com/datastore/docs/ttl
#
# need to keep follows because we attach them to Followers and use them for
# unfollows
DONT_EXPIRE_OBJECT_TYPES = as1.ACTOR_TYPES
EXPIRE_LATE_OBJECT_TYPES = as1.POST_TYPES | set([
    'block', 'flag', 'follow', 'like', 'share'])
OBJECT_EARLY_EXPIRE_AGE = timedelta(days=2 * 30)
OBJECT_LATE_EXPIRE_AGE = timedelta(days=6 * 30)

GET_ORIGINALS_CACHE_EXPIRATION = timedelta(days=1)
FOLLOWERS_CACHE_EXPIRATION = timedelta(hours=2)

# See https://www.cloudimage.io/
IMAGE_PROXY_URL_BASE = 'https://xaasg3w5.cloudimg.io/'
IMAGE_PROXY_DOMAINS = ('threads.net',)

# used by User.status_description. values are formatted with format(user=...)
USER_STATUS_DESCRIPTIONS = {  # keep in sync with DM.type's docstring!
    'moved': 'account has migrated to another account',
    'no-feed-or-webmention': "web site doesn't have an RSS or Atom feed or webmention endpoint",
    'nobot': "profile has 'nobot' in it",
    'nobridge': "profile has 'nobridge' in it",
    'no-nip05': "account's NIP-05 identifier is missing or invalid",
    'no-profile': 'profile is missing or empty',
    'opt-out': 'account or instance has requested to be opted out',
    'over-handle-domain-limit': "handle's domain has too many users on it",
    'owns-webfinger': 'web site looks like a fediverse instance because it already serves Webfinger',
    'private': 'account is set as private or protected',
    'requires-avatar': "account doesn't have a profile picture",
    'requires-name': "account's name and username are the same",
    'requires-old-account': f"account is less than {humanize.naturaldelta(OLD_ACCOUNT_AGE)} old",
    'unsupported-handle-ap': f"<a href='https://fed.brid.gy/docs#fediverse-get-started'>username has characters that Bridgy Fed doesn't currently support</a>",
}
# used as the User.handle_pay_level_domain value when there are too many users on
# the pay-level domain
OVER_LIMIT = 'too-many'

logger = logging.getLogger(__name__)


[docs] class Target(ndb.Model): r""":class:`protocol.Protocol` + URI pairs for identifying objects. These are currently used for: * delivery destinations, eg ActivityPub inboxes, webmention targets, etc. * copies of :class:`Object`\s and :class:`User`\s elsewhere, eg ``at://`` URIs for ATProto records, nevent etc bech32-encoded Nostr ids, ATProto user DIDs, etc. Used in :class:`google.cloud.ndb.model.StructuredProperty`\s inside :class:`Object` and :class:`User`; not stored as top-level entities in the datastore. ndb implements this by hoisting each property here into a corresponding property on the parent entity, prefixed by the StructuredProperty name below, eg ``delivered.uri``, ``delivered.protocol``, etc. For repeated StructuredPropertys, the hoisted properties are all repeated on the parent entity, and reconstructed into StructuredPropertys based on their order. """ uri = ndb.StringProperty(required=True) '' protocol = ndb.StringProperty(choices=list(PROTOCOLS.keys()), required=True) '' def __eq__(self, other): """Equality excludes :class:`Key`.""" if isinstance(other, Target): return self.uri == other.uri and self.protocol == other.protocol def __hash__(self): """Allow hashing so these can be dict keys.""" return hash((self.protocol, self.uri))
[docs] class DM(ndb.Model): """:class:`protocol.Protocol` + type pairs for identifying sent DMs. Used in :attr:`User.sent_dms`. https://googleapis.dev/python/python-ndb/latest/model.html#google.cloud.ndb.model.StructuredProperty """ type = ndb.StringProperty(required=True) """Known values (keep in sync with USER_STATUS_DESCRIPTIONS, the subset for ineligible users): * dms_not_supported-[RECIPIENT-USER-ID] * moved * no-feed-or-webmention * no-nip05 * no-profile * opt-out * over-handle-domain-limit * owns-webfinger * private * replied_to_bridged_user * request_bridging * requires-avatar * requires-name * requires-old-account * unsupported-handle-ap * welcome """ protocol = ndb.StringProperty(choices=list(PROTOCOLS.keys()), required=True) '' def __eq__(self, other): """Equality excludes :class:`Key`.""" return self.type == other.type and self.protocol == other.protocol
[docs] class KeyPair(ndb.Model): """A user's public/private key pair for a single protocol. Used in :attr:`User.keypairs` ; not stored as top-level entities in the datastore. The private key is encrypted at rest; the public key is not. Format per ``algorithm``: * ``rsa``: ``private_key_bytes`` is PKCS#1 PEM (``-----BEGIN RSA PRIVATE KEY-----``); ``public_key_bytes`` is SPKI PEM (``-----BEGIN PUBLIC KEY-----``). * ``secp256k1``: 32 raw bytes private, 32 raw bytes BIP-340 x-only public. * ``ed25519``: 32 raw bytes private, 32 raw bytes public. Details for each protocol: * ActivityPub: RSA * ATProto: secp256k1, with ECDSA signatures (keypair is stored in :class:`arroba.datastore_storage.AtpRepo`, *not* here) https://atproto.com/specs/cryptography * Farcaster: Ed25519, with EdDSA signatures https://docs.farcaster.xyz/reference/farcaster/intent-urls#resource-urls * Nostr: secp256k1, with Schnorr signatures https://github.com/nostr-protocol/nips/blob/master/01.md#events-and-signatures """ protocol = ndb.StringProperty(choices=list(PROTOCOLS.keys()), required=True) '' algorithm = ndb.StringProperty(choices=('ed25519', 'rsa', 'secp256k1'), required=True) '' public_key_bytes = ndb.BlobProperty(required=True) '' private_key_bytes = EncryptedProperty(required=True) '' def __eq__(self, other): """Equality excludes :class:`Key`.""" if isinstance(other, KeyPair): return (self.protocol == other.protocol and self.algorithm == other.algorithm and self.public_key_bytes == other.public_key_bytes and self.private_key_bytes == other.private_key_bytes)
[docs] class ProtocolUserMeta(type(ndb.Model)): """:class:`User` metaclass. Registers all subclasses in ``PROTOCOLS``.""" def __new__(meta, name, bases, class_dict): cls = super().__new__(meta, name, bases, class_dict) label = getattr(cls, 'LABEL', None) if (label and label not in ('protocol', 'user') and (DEBUG or cls.LABEL not in DEBUG_PROTOCOLS)): for label in (label, cls.ABBREV) + cls.OTHER_LABELS: if label: PROTOCOLS[label] = cls PROTOCOLS_BY_KIND[cls._get_kind()] = cls return cls
[docs] def reset_protocol_properties(): """Recreates various protocol properties to include choices from ``PROTOCOLS``.""" abbrevs = f'({"|".join(PROTOCOLS.keys())}|fed)' domains.SUBDOMAIN_BASE_URL_RE = re.compile( rf'^https?://({abbrevs}\.brid\.gy|localhost(:8080)?)/(convert/|r/)?({abbrevs}/)?(?P<path>.+)') ids.COPIES_PROTOCOLS = tuple(label for label, proto in PROTOCOLS.items() if proto and proto.HAS_COPIES)
[docs] @lru_cache(maxsize=100000) @memcache.memoize(expire=GET_ORIGINALS_CACHE_EXPIRATION) def get_original_object_key(copy_id): """Finds the :class:`Object` with a given copy id, if any. Note that :meth:`Object.add` also updates this function's :func:`memcache.memoize` cache. Args: copy_id (str) Returns: google.cloud.ndb.Key or None """ assert copy_id return Object.query(Object.copies.uri == copy_id).get(keys_only=True)
[docs] @lru_cache(maxsize=100000) @memcache.memoize(expire=GET_ORIGINALS_CACHE_EXPIRATION) def get_original_user_key(copy_id): """Finds the user with a given copy id, if any. Note that :meth:`User.add` also updates this function's :func:`memcache.memoize` cache. Args: copy_id (str) Returns: google.cloud.ndb.Key or None """ assert copy_id for proto in PROTOCOLS.values(): if proto and proto.LABEL != 'ui' and not proto.owns_id(copy_id): if orig := proto.query(proto.copies.uri == copy_id).get(keys_only=True): return orig
[docs] class AddRemoveMixin: """Mixin class that defines the :meth:`add` and :meth:`remove` methods. If a subclass of this mixin defines the ``GET_ORIGINAL_FN`` class-level attribute, its memoize cache will be cleared when :meth:`remove` is called with the ``copies`` property. """ lock = None """Synchronizes :meth:`add`, :meth:`remove`, etc.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.lock = Lock()
[docs] def add(self, prop, val): """Adds a value to a multiply-valued property. Args: prop (str) val Returns: True if val was added, ie it wasn't already in prop, False otherwise """ with self.lock: added = util.add(getattr(self, prop), val) if prop == 'copies' and added: if fn := getattr(self, 'GET_ORIGINAL_FN'): memcache.pickle_memcache.set(memcache.memoize_key(fn, val.uri), self.key) return added
[docs] def remove(self, prop, val): """Removes a value from a multiply-valued property. Args: prop (str) val """ with self.lock: existing = getattr(self, prop) if val in existing: existing.remove(val) if prop == 'copies': self.clear_get_original_cache(val.uri)
[docs] def remove_copies_on(self, proto): """Removes all copies on a given protocol. ``proto.HAS_COPIES`` must be True. Args: proto (protocol.Protocol subclass) """ assert proto.HAS_COPIES for copy in self.copies: if copy.protocol in (proto.ABBREV, proto.LABEL): self.remove('copies', copy)
@classmethod def clear_get_original_cache(cls, uri): if fn := getattr(cls, 'GET_ORIGINAL_FN'): memcache.pickle_memcache.delete(memcache.memoize_key(fn, uri))
# WARNING: AddRemoveMixin *must* be before StringIdModel here so that its __init__ # gets called! Due to an (arguable) ndb.Model bug: # https://github.com/googleapis/python-ndb/issues/1025
[docs] class User(AddRemoveMixin, StringIdModel, metaclass=ProtocolUserMeta): """Abstract base class for a Bridgy Fed user.""" GET_ORIGINAL_FN = get_original_user_key 'used by AddRemoveMixin' obj_key = ndb.KeyProperty(kind='Object') # user profile '' use_instead = ndb.KeyProperty() '' copies = ndb.StructuredProperty(Target, repeated=True) """Proxy copies of this user elsewhere, eg DIDs for ATProto records, bech32 npub Nostr ids, etc. Similar to ``rel-me`` links in microformats2, ``alsoKnownAs`` in DID docs (and now AS2), etc. """ keypairs = ndb.StructuredProperty(KeyPair, repeated=True) """Key pairs for this user, one per bridged protocol. Encrypted at rest.""" manual_opt_out = ndb.BooleanProperty() """Set to True to manually disable this user. Set to False to override spam filters and forcibly enable this user.""" enabled_protocols = ndb.StringProperty(repeated=True, choices=list(PROTOCOLS.keys())) """Protocols that this user has explicitly opted into. Protocols that don't require explicit opt in are omitted here. """ has_object_feed_followers_on = ndb.StringProperty(repeated=True, choices=list(PROTOCOLS.keys())) """Protocol labels of protocols that use :attr:`~Protocol.USES_OBJECT_FEED` and have ever had a follower of this user.""" sent_dms = ndb.StructuredProperty(DM, repeated=True) """DMs that we've attempted to send to this user.""" send_notifs = ndb.StringProperty(default='all', choices=('all', 'none')) """Which notifications we should send this user.""" blocks = ndb.KeyProperty(kind='Object', repeated=True) '' verified_domain = ndb.StringProperty() """Domain that we've verified this user owns, eg web site top-level NIP-05, etc.""" created = ndb.DateTimeProperty(auto_now_add=True) '' updated = ndb.DateTimeProperty(auto_now=True) '' # `existing` attr is set by get_or_create # OLD. some stored entities still have these; do not reuse. # direct = ndb.BooleanProperty(default=False) # actor_as2 = JsonProperty() # protocol-specific state # atproto_notifs_indexed_at = ndb.TextProperty() # atproto_feed_indexed_at = ndb.TextProperty() # mod = ndb.StringProperty() # https://github.com/snarfed/bridgy-fed/issues/794 # public_exponent = ndb.StringProperty() # private_exponent = ndb.StringProperty() # nostr_key_bytes = EncryptedProperty() def __init__(self, **kwargs): """Constructor. Sets :attr:`obj` explicitly because however :class:`google.cloud.ndb.model.Model` sets it doesn't work with ``@property`` and ``@obj.setter`` below. """ obj = kwargs.pop('obj', None) super().__init__(**kwargs) if obj: self.obj = obj
[docs] @classmethod def new(cls, **kwargs): """Try to prevent instantiation. Use subclasses instead.""" raise NotImplementedError()
def _post_put_hook(self, future): logger.debug(f'Wrote {self.key}')
[docs] @classmethod def get_by_id(cls, id, allow_opt_out=False, **kwargs): """Override to follow ``use_instead`` property and ``status``. Returns None if the user is opted out. """ user = cls._get_by_id(id, **kwargs) if user and user.use_instead: logger.debug(f'{user.key} use_instead => {user.use_instead}') user = user.use_instead.get() if not user: return None if user.status and not allow_opt_out: logger.info(f'{user.key} is {user.status}') return None return user
[docs] @classmethod def get_or_create(cls, id, propagate=False, allow_opt_out=False, reload=False, raise_=False, **kwargs): """Loads and returns a :class:`User`. Creates it if necessary. If ``allow_opt_out`` is False and ``id`` is the bridged id for a user in another protocol, returns that user instead. Note that they'll be a different type than ``cls``! Not transactional because transactions don't read or write memcache. :/ Fortunately we don't really depend on atomicity for much, last writer wins is usually fine. Args: propagate (bool): whether to create copies of this user in push-based protocols, eg ATProto and Nostr. allow_opt_out (bool): whether to allow and create the user if they're currently opted out reload (bool): whether to reload profile always, vs only if necessary raise_ (bool): passed through to :meth:`User.reload_profile`. If False, and :meth:`User.reload_profile` returns None when fetching the user's profile, this method raises :class:`RuntimeError` kwargs: passed through to ``cls`` constructor Returns: User: existing or new user, or None if the user is opted out """ assert cls != User # TODO? # id = ids.normalize_user_id(id=id, proto=cls) user = cls.get_by_id(id, allow_opt_out=True) if user: # existing if reload: user.reload_profile(gateway=True, raise_=raise_) if user.status and not allow_opt_out: return None user.existing = True # TODO: propagate more fields? changed = False for field in ['obj', 'obj_key', 'manual_opt_out']: old_val = getattr(user, field, None) new_val = kwargs.get(field) if old_val is None and new_val is not None: setattr(user, field, new_val) changed = True if enabled_protocols := kwargs.get('enabled_protocols'): user.enabled_protocols = (set(user.enabled_protocols) | set(enabled_protocols)) changed = True if not propagate: if changed: try: user.put() except AssertionError as e: logger.debug(e) error(f'Bad {cls.__name__} id {id} : {e}') return user else: # new, not existing if not allow_opt_out and (orig_key := get_original_user_key(id)): orig = orig_key.get() if orig.status: return None orig.existing = False return orig user = cls(id=id, **kwargs) user.existing = False try: user.reload_profile(gateway=True, raise_=raise_) except AssertionError as e: logger.debug(e) error(f'Bad {cls.__name__} id {id} : {e}') if user.status and not allow_opt_out: return None if propagate and user.status in (None, 'private'): for label in user.enabled_protocols + list(user.DEFAULT_ENABLED_PROTOCOLS): proto = PROTOCOLS[label] if proto == cls: continue elif proto.HAS_COPIES: if not user.get_copy(proto) and user.is_enabled(proto): try: proto.create_for(user) except (ValueError, AssertionError): logger.info(f'failed creating {proto.LABEL} copy', exc_info=True) user.remove('enabled_protocols', proto.LABEL) else: logger.debug(f'{proto.LABEL} not enabled or user copy already exists, skipping propagate') try: user.put() except AssertionError as e: error(f'Bad {cls.__name__} id {id} : {e}') logger.debug(('Updated ' if user.existing else 'Created new ') + str(user)) return user
@property def obj(self): """Convenience accessor that loads :attr:`obj_key` from the datastore.""" if self.obj_key: if not hasattr(self, '_obj'): self._obj = self.obj_key.get() return self._obj @obj.setter def obj(self, obj): if obj: assert isinstance(obj, Object) assert obj.key self._obj = obj self.obj_key = obj.key else: self._obj = self.obj_key = None
[docs] def delete(self, proto=None): """Deletes a user's bridged actors in all protocols or a specific one. Args: proto (Protocol): optional """ now = util.now().isoformat() proto_label = proto.LABEL if proto else 'all' delete_id = f'{self.profile_id()}#bridgy-fed-delete-user-{proto_label}-{now}' delete = Object(id=delete_id, source_protocol=self.LABEL, our_as1={ 'id': delete_id, 'objectType': 'activity', 'verb': 'delete', 'actor': self.key.id(), 'object': self.key.id(), }) self.deliver(delete, from_user=self, to_proto=proto)
[docs] @classmethod def load_multi(cls, users): """Loads :attr:`obj` for multiple users in parallel. Args: users (sequence of User) """ objs = ndb.get_multi(u.obj_key for u in users if u.obj_key) keys_to_objs = {o.key: o for o in objs if o} for u in users: u._obj = keys_to_objs.get(u.obj_key)
@ndb.ComputedProperty def handle(self): """This user's unique, human-chosen handle, eg ``@me@snarfed.org``. To be implemented by subclasses. """ raise NotImplementedError() @ndb.ComputedProperty def handle_as_domain(self): """This user's handle in domain-like format, via :func:`id.handle_as_domain`. Returns: str or None: if handle is None """ if not hasattr(self, '_stored_handle_as_domain'): self._stored_handle_as_domain = stored_value(self, 'handle_as_domain') if self.verified_domain: return self.verified_domain return ids.handle_as_domain(self.handle) @ndb.ComputedProperty def handle_pay_level_domain(self): """This user's handle's pay-level domain. Pay-level domains are domains at the registrar level, usually (but not always) one level below a TLD. For example, bar.com is the pay-level domain for both foo.bar.com and baz.bar.com, and bbc.co.uk is the pay-level domain for www.bbc.co.uk. WARNING: this is only set for the first accounts created before hitting the :attr:`Protocol.HANDLES_PER_PAY_LEVEL_DOMAIN` limit for their pay-level domain. Accounts after that limit will have 'too-many' as their value. """ last_pld = stored_value(self, 'handle_pay_level_domain') if self.handle_as_domain == self._stored_handle_as_domain: # handle is unchanged. use our existing stored value for # handle_pay_level_domain to avoid re-querying the datastore for other # users on the same domain return last_pld if self.handle_as_domain: if extract := domains.tldextract(self.handle_as_domain): if cur_pld := extract.top_domain_under_public_suffix: if cur_pld == last_pld or not self.HANDLES_PER_PAY_LEVEL_DOMAIN: return cur_pld num_others = self.query(User.handle_pay_level_domain == cur_pld, User.status == None).count() if num_others < self.HANDLES_PER_PAY_LEVEL_DOMAIN: return cur_pld else: return OVER_LIMIT @ndb.ComputedProperty def status(self): """Whether this user is blocked or opted out. Optional. See :attr:`USER_STATUS_DESCRIPTIONS` for possible values. """ # TODO # if not hasattr(self, '_stored_status'): # self._stored_status = stored_value(self, 'status') if self.manual_opt_out: return 'opt-out' elif self.manual_opt_out is False: return None # TODO: require profile for more protocols? all? if not self.obj or not self.obj.as1: return None if self.obj.as1.get('bridgeable') is False: # FEP-0036 return 'opt-out' if self.REQUIRES_AVATAR and not self.obj.as1.get('image'): return 'requires-avatar' name = self.obj.as1.get('displayName') if self.REQUIRES_NAME and (not name or name in (self.handle, self.key.id())): return 'requires-name' if self.REQUIRES_OLD_ACCOUNT: if published := self.obj.as1.get('published'): if util.now() - util.parse_iso8601(published) < OLD_ACCOUNT_AGE: return 'requires-old-account' # https://swicg.github.io/miscellany/#movedTo # https://docs.joinmastodon.org/spec/activitypub/#as if self.obj.as1.get('movedTo'): return 'moved' summary = html_to_text(self.obj.as1.get('summary', ''), ignore_links=True) name = html_to_text(self.obj.as1.get('displayName', ''), ignore_links=True) # #nobridge overrides enabled_protocols if '#nobridge' in summary or '#nobridge' in name: return 'nobridge' if self.HANDLES_PER_PAY_LEVEL_DOMAIN: # TODO # if self._stored_status: # self._values.pop('handle_pay_level_domain', None) if self.handle_pay_level_domain == OVER_LIMIT: return 'over-handle-domain-limit' # user has explicitly opted in. should go after spam filter (REQUIRES_*) # checks, but before is_public and #nobot # # !!! WARNING: keep in sync with User.enable_protocol! if self.enabled_protocols: return None if not as1.is_public(self.obj.as1, unlisted=False): return 'private' # enabled_protocols overrides #nobot if '#nobot' in summary or '#nobot' in name: return 'nobot'
[docs] def status_description(self): """Returns a human-readable description of this user's status. ...or None if this user's status is None, or a description isn't available. Returns: str """ if desc := USER_STATUS_DESCRIPTIONS.get(self.status): return desc.format(user=self)
[docs] def is_enabled(self, to_proto, explicit=False): """Returns True if this user is bridged to a given protocol. Reasons this might return False: * We haven't turned on bridging these two protocols yet. * The user is opted out or blocked. * The user is on a domain that's opted out or blocked. * The from protocol requires opt in, and the user hasn't opted in. * ``explicit`` is True, and this protocol supports ``to_proto`` by, but the user hasn't explicitly opted into it. Args: to_proto (Protocol subclass) explicit (bool) Returns: bool: """ from protocol import Protocol assert isinstance(to_proto, Protocol) or issubclass(to_proto, Protocol) if self.__class__ == to_proto: return True from_label = self.LABEL to_label = to_proto.LABEL if bot_protocol := Protocol.for_bridgy_subdomain(self.key.id()): return to_proto != bot_protocol elif self.manual_opt_out: return False elif to_label in self.enabled_protocols: return True elif self.status: return False elif to_label in self.DEFAULT_ENABLED_PROTOCOLS and not explicit: return True return False
[docs] def enable_protocol(self, to_proto): """Adds ``to_proto`` to :attr:`enabled_protocols`. Also sends a welcome DM to the user (via a send task) if their protocol supports DMs. Args: to_proto (:class:`protocol.Protocol` subclass) """ import dms # explicit opt-in overrides some status # !!! WARNING: keep in sync with User.status! ineligible = """Hi! Your account isn't eligible for bridging yet because your {desc}. <a href="https://fed.brid.gy/docs#troubleshooting">More details here.</a> You can try again once that's fixed by unfollowing and re-following this account.""" if self.status and self.status not in ('nobot', 'private'): if desc := self.status_description(): dms.maybe_send(from_=to_proto, to_user=self, type=self.status, text=ineligible.format(desc=desc)) common.error(f'Nope, user {self.key.id()} is {self.status}', status=299) # check that our handle is supported in this protocol err = handle = None try: handle = self.handle_as(to_proto) except ValueError as e: err = e if not handle: err_text = str(err) if err else 'handle is unset' dms.maybe_send(from_=to_proto, to_user=self, type=f'unsupported-handle-{to_proto.ABBREV}', text=ineligible.format(desc=err_text)) common.error(err_text, status=299) # add to enabled_protocols in memory so that create_for (below) etc see it, # including its effects on status, but don't store to datastore until after # create_for in case it fails self.add('enabled_protocols', to_proto.LABEL) if to_proto.LABEL in ids.COPIES_PROTOCOLS: # do this even if there's an existing copy since we might need to # reactivate it, which create_for should do to_proto.create_for(self) dms.maybe_send(from_=to_proto, to_user=self, type='welcome', text=f"""Welcome to Bridgy Fed! Your account will soon be bridged to {to_proto.PHRASE} at {self.html_link(proto=to_proto, name=False)}. <a href="https://fed.brid.gy/docs">See the docs</a> and <a href="https://{PRIMARY_DOMAIN}{self.user_page_path()}">your user page</a> for more information. To disable this and delete your bridged profile, block this account.""") self.put() common.create_task(queue='user-enabled', user=self.key.urlsafe(), protocol=to_proto.LABEL) logger.info(f'Enabled {to_proto.LABEL} for {self.key.id()}')
[docs] def disable_protocol(self, to_proto): """Removes ``to_proto` from :attr:`enabled_protocols``. Args: to_proto (:class:`protocol.Protocol` subclass) """ self.remove('enabled_protocols', to_proto.LABEL) self.put() msg = f'Disabled {to_proto.LABEL} for {self.key.id()} : {self.user_page_path()}' logger.info(msg)
[docs] def handle_as(self, to_proto, short=False): """Returns this user's handle in a different protocol. Args: to_proto (str or Protocol) short (bool): whether to return the full handle or a shortened form. Default False. Currently only affects ActivityPub; returns just ``@[user]`` instead of ``@[user]@[domain]`` Returns: str: """ if isinstance(to_proto, str): to_proto = PROTOCOLS[to_proto] # override to-ATProto to use custom domain handle in DID doc from atproto import ATProto, did_to_handle if to_proto == ATProto: if did := self.get_copy(ATProto): if handle := did_to_handle(did, remote=False): return handle # override web users to always use domain instead of custom username # TODO: fall back to id if handle is unset? handle = self.key.id() if self.LABEL == 'web' else self.handle if not handle: return None return ids.translate_handle(handle=handle, from_=self.__class__, to=to_proto, short=short)
[docs] def id_as(self, to_proto): """Returns this user's id in a different protocol. Args: to_proto (str or Protocol) Returns: str """ if isinstance(to_proto, str): to_proto = PROTOCOLS[to_proto] return ids.translate_user_id(id=self.key.id(), from_=self.__class__, to=to_proto)
[docs] def handle_or_id(self): """Returns handle if we know it, otherwise id.""" return self.handle or self.key.id()
def _keypair(self, protocol): """Returns this user's :class:`KeyPair` for ``protocol``, or None.""" for kp in self.keypairs: if kp.protocol == protocol: return kp
[docs] @memcache.memoize(key=lambda self: self.key.id()) def public_pem(self): """Returns the user's PEM-encoded ActivityPub public RSA key. Returns: bytes: """ self._maybe_generate_ap_key() return self._keypair('activitypub').public_key_bytes
[docs] @memcache.memoize(key=lambda self: self.key.id()) def private_pem(self): """Returns the user's PEM-encoded ActivityPub private RSA key. Returns: bytes: """ self._maybe_generate_ap_key() return self._keypair('activitypub').private_key_bytes
def _maybe_generate_ap_key(self): """Generates this user's ActivityPub private key if necessary.""" if self._keypair('activitypub'): return logger.info(f'generating AP keypair for {self.key.id()}') key = RSA.generate(KEY_BITS, randfunc=random.randbytes if DEBUG else None) self.keypairs.append(KeyPair( protocol='activitypub', algorithm='rsa', public_key_bytes=key.publickey().exportKey(format='PEM'), private_key_bytes=key.exportKey(format='PEM'), )) self.put()
[docs] def nsec(self): """Returns the user's bech32-encoded Nostr private secp256k1 key. Returns: str: """ self._maybe_generate_nostr_key() privkey = secp256k1.PrivateKey( self._keypair('nostr').private_key_bytes, raw=True) return granary.nostr.bech32_encode('nsec', privkey.serialize())
[docs] def hex_pubkey(self): """Returns the user's hex-encoded Nostr public secp256k1 key. Returns: str: """ self._maybe_generate_nostr_key() return self._keypair('nostr').public_key_bytes.hex()
[docs] def npub(self): """Returns the user's bech32-encoded ActivityPub public secp256k1 key. Returns: str: """ return granary.nostr.bech32_encode('npub', self.hex_pubkey())
def _maybe_generate_nostr_key(self): """Generates this user's Nostr private key if necessary.""" if self._keypair('nostr'): return logger.info(f'generating Nostr keypair for {self.key.id()}') priv = secp256k1.PrivateKey() pub_hex = granary.nostr.pubkey_from_privkey(priv.private_key.hex()) self.keypairs.append(KeyPair( protocol='nostr', algorithm='secp256k1', public_key_bytes=bytes.fromhex(pub_hex), private_key_bytes=priv.private_key, )) self.put()
[docs] def farcaster_key(self): """Returns the user's Farcaster signing key. TODO: real per-user signer keys, registered on-chain via the KeyRegistry. Messages signed with these stub keys will be rejected by the hub. Returns: cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey: """ self._maybe_generate_farcaster_key() return Ed25519PrivateKey.from_private_bytes( self._keypair('farcaster').private_key_bytes)
def _maybe_generate_farcaster_key(self): """Generates this user's Farcaster Ed25519 keypair if necessary.""" if self._keypair('farcaster'): return logger.info(f'generating Farcaster keypair for {self.key.id()}') priv = Ed25519PrivateKey.generate() self.keypairs.append(KeyPair( protocol='farcaster', algorithm='ed25519', public_key_bytes=priv.public_key().public_bytes( Encoding.Raw, PublicFormat.Raw), private_key_bytes=priv.private_bytes( Encoding.Raw, PrivateFormat.Raw, NoEncryption()), )) self.put()
[docs] def name(self): """Returns this user's human-readable name, eg ``Ryan Barrett``.""" if self.obj and self.obj.as1: if name := self.obj.as1.get('displayName'): return name return self.handle_or_id()
[docs] def web_url(self): """Returns this user's user-facing profile page URL. ...eg ``https://bsky.app/profile/snarfed.org`` or ``https://foo.com/``. To be implemented by subclasses. Returns: str """ raise NotImplementedError()
[docs] def is_web_url(self, url, ignore_www=False): """Returns True if the given URL is this user's web URL (homepage). Args: url (str) ignore_www (bool): if True, ignores ``www.`` subdomains Returns: bool: """ if not url: return False url = url.strip().rstrip('/') url = re.sub(r'^(https?://)www\.', r'\1', url) parsed_url = urlparse(url) if parsed_url.scheme not in ('http', 'https', ''): return False this = self.web_url().rstrip('/') this = re.sub(r'^(https?://)www\.', r'\1', this) parsed_this = urlparse(this) return (url == this or url == parsed_this.netloc or parsed_url[1:] == parsed_this[1:]) # ignore http vs https
[docs] def id_uri(self): """Returns the user id as a URI. Sometimes this is the user id itself, eg ActivityPub actor ids. Sometimes it's a bit different, eg at://did:plc:... for ATProto user, https://site.com for Web users. Returns: str """ return self.key.id()
[docs] def profile_id(self): """Returns the id of this user's profile object in its native protocol. Examples: * Web: home page URL, eg ``https://me.com/`` * ActivityPub: actor URL, eg ``https://instance.com/users/me`` * ATProto: profile AT URI, eg ``at://did:plc:123/app.bsky.actor.profile/self`` Defaults to this user's key id. Returns: str or None: """ return ids.profile_id(id=self.key.id(), proto=self)
[docs] def is_profile(self, obj): """Returns True if ``obj`` is this user's profile/actor, False otherwise. Args: obj (Object) Returns: bool: """ self_ids = [self.key.id(), self.profile_id()] if self.obj_key: self_ids.append(self.obj_key.id()) if obj.key and obj.key.id() in self_ids: return True elif obj.as1: obj_as1 = (as1.get_object(obj.as1) if obj.as1.get('verb') in as1.CRUD_VERBS else obj.as1) if obj_as1.get('id') in self_ids: return True
[docs] def reload_profile(self, raise_=False, **kwargs): """Reloads this user's identity and profile from their native protocol. Populates the reloaded profile :class:`Object` in ``self.obj``. Args: raise_ (bool): passed through to :meth:`Protocol.load`. If False, and :meth:`Protocol.load` returns None when fetching the user's profile, this method raises :class:`RuntimeError` kwargs: passed through to :meth:`Protocol.load` Raises: RuntimeError: if the user's profile can't be loaded """ id = self.profile_id() obj = self.load(id, remote=True, raise_=raise_, **kwargs) if obj: if obj.type: assert obj.type in as1.ACTOR_TYPES, obj.type self.obj = obj elif raise_: raise RuntimeError(f"Couldn't load {id} on {self.PHRASE}") # write the user so that we re-populate any computed properties self.put()
[docs] def user_page_path(self, rest=None, prefer_id=False): """Returns the user's Bridgy Fed user page path. Args: rest (str): additional path and/or query to add to the end prefer_id (bool): whether to prefer to use the account's id in the path instead of handle. Defaults to ``False``. """ path = f'/{self.ABBREV}/{self.key.id() if prefer_id else self.handle_or_id()}' if rest: if not (rest.startswith('?') or rest.startswith('/')): path += '/' path += rest return path
[docs] def get_copy(self, proto): """Returns the id for the copy of this user in a given protocol. ...or None if no such copy exists. If ``proto`` is this user, returns this user's key id. Args: proto: :class:`Protocol` subclass Returns: str: """ # don't use isinstance because the testutil Fake protocol has subclasses if self.LABEL == proto.LABEL: return self.key.id() for copy in self.copies: if copy.protocol in (proto.LABEL, proto.ABBREV): return copy.uri
[docs] def profile_picture(self): """Returns the user's profile picture image URL, if available, or None.""" if self.obj and self.obj.as1: return util.get_url(self.obj.as1, 'image')
# can't use functools.lru_cache here because we want the cache key to be # just the user id, not the whole entity
[docs] @cachetools.cached( cachetools.TTLCache(50000, FOLLOWERS_CACHE_EXPIRATION.total_seconds()), key=lambda user: user.key.id(), lock=Lock()) @memcache.memoize(key=lambda self: self.key.id(), expire=FOLLOWERS_CACHE_EXPIRATION) def count_followers(self): """Counts this user's followers and followings. Returns: (int, int) tuple: (number of followers, number following) """ if self.key.id() in PROTOCOL_DOMAINS: # we don't store Followers for protocol bot users any more, so # follower counts are inaccurate, so don't return them return (0, 0) num_followers = Follower.query(Follower.to == self.key, Follower.status == 'active')\ .count_async() num_following = Follower.query(Follower.from_ == self.key, Follower.status == 'active')\ .count_async() return num_followers.get_result(), num_following.get_result()
[docs] def is_blocking(self, user_or_id): """Returns True if this user is is blocking ``user_or_id``, False otherwise. Looks at domain blocklists in :attr:`blocks`. Eventually we can add support for blocking individual users in that too. Args: user_or_id (User or str) Returns: bool: """ if not user_or_id or not (isinstance(user_or_id, User) or util.is_url(user_or_id) or DOMAIN_RE.fullmatch(user_or_id)): return False blocklists = ndb.get_multi(key for key in self.blocks if key.kind() == 'Object') for list in blocklists: if list.domain_blocklist_matches(user_or_id): logger.info(f'{self.key.id()} is blocking {user_or_id}') return True
[docs] def add_domain_blocklist(self, url): """Adds a domain blocklist to this user. Loads the CSV at the given URL adds it to :attr:`blocks` if it's not already there. Args: url (str): URL of CSV blocklist to add Returns: Object: CSV blocklist, or None if it couldn't be loaded """ from web import Web key = Object(id=maybe_truncate_key_id(url)).key if key in self.blocks: return key.get() if obj := Web.load(url, csv=True): self.blocks.append(obj.key) self.put() return obj
[docs] def remove_domain_blocklist(self, url): """Removes a domain blocklist from this user. Args: url (str): URL of CSV blocklist to remove Returns: Object: CSV blocklist, or None if it couldn't be loaded """ from web import Web key = Object(id=maybe_truncate_key_id(url)).key if key in self.blocks: self.blocks.remove(key) self.put() return key.get() if obj := Web.load(url, csv=True): return obj
# WARNING: AddRemoveMixin *must* be before StringIdModel here so that its __init__ # gets called! Due to an (arguable) ndb.Model bug: # https://github.com/googleapis/python-ndb/issues/1025
[docs] class Object(AddRemoveMixin, StringIdModel): """An activity or other object, eg actor. Key name is the id, generally a URI. We synthesize ids if necessary. """ GET_ORIGINAL_FN = get_original_object_key 'used by AddRemoveMixin' users = ndb.KeyProperty(repeated=True) 'User(s) who created or otherwise own this object.' notify = ndb.KeyProperty(repeated=True) """User who should see this in their user page, eg in reply to, reaction to, share of, etc. """ feed = ndb.KeyProperty(repeated=True) 'User who should see this in their feeds, eg followers of its creator' source_protocol = ndb.StringProperty(choices=list(PROTOCOLS.keys())) """The protocol this object originally came from. TODO: nail down whether this is :attr:`ABBREV`` or :attr:`LABEL` """ # TODO: switch back to ndb.JsonProperty if/when they fix it for the web console # https://github.com/googleapis/python-ndb/issues/874 as2 = JsonProperty() 'ActivityStreams 2, for ActivityPub' bsky = JsonProperty() 'AT Protocol lexicon, for Bluesky' csv = ndb.TextProperty() 'Other standalone CSV data, eg domain blocklist.' farcaster = ndb.BlobProperty(repeated=True) """List of binary serialized Farcaster :class:`Message` protobufs. Each blob is ``SerializeToString()`` and decodes via ``Message.FromString(blob)``. Repeated to support actors as multiple ``USER_DATA_ADD`` messages. """ mf2 = JsonProperty() 'HTML microformats2 item (*not* top level parse object with ``items`` field)' nostr = JsonProperty() 'Nostr event' our_as1 = JsonProperty() 'ActivityStreams 1, for activities that we generate or modify ourselves' raw = JsonProperty() 'Other standalone data format, eg DID document' extra_as1 = JsonProperty() "Additional individual fields to merge into this object's AS1 representation" is_csv = ndb.BooleanProperty() "Whether this object is a CSV. Needed because :attr:`csv` isn't indexed." # TODO: remove and actually delete Objects instead! deleted = ndb.BooleanProperty() '' copies = ndb.StructuredProperty(Target, repeated=True) """Copies of this object elsewhere, eg at:// URIs for ATProto records and nevent etc bech32-encoded Nostr ids, where this object is the original. Similar to u-syndication links in microformats2 and upstream/downstreamDuplicates in AS1. """ created = ndb.DateTimeProperty(auto_now_add=True) '' updated = ndb.DateTimeProperty(auto_now=True) '' new = None """True if this object is new, ie this is the first time we've seen it, False otherwise, None if we don't know. """ changed = None """True if this object's contents have changed from our existing copy in the datastore, False otherwise, None if we don't know. :class:`Object` is new/changed. See :meth:`activity_changed()` for more details. """ # DEPRECATED # These were for full feeds with multiple items, not just this one, so they were # stored as audit records only, not used in to_as1. for Atom/RSS # based Objects, our_as1 was populated with an feed_index top-level # integer field that indexed into one of these. # # atom = ndb.TextProperty() # Atom XML # rss = ndb.TextProperty() # RSS XML # DEPRECATED; these were for delivery tracking, but they were too expensive, # so we stopped: https://github.com/snarfed/bridgy-fed/issues/1501 # # STATUSES = ('new', 'in progress', 'complete', 'failed', 'ignored') # status = ndb.StringProperty(choices=STATUSES) # delivered = ndb.StructuredProperty(Target, repeated=True) # undelivered = ndb.StructuredProperty(Target, repeated=True) # failed = ndb.StructuredProperty(Target, repeated=True) # DEPRECATED but still used read only to maintain backward compatibility # with old Objects in the datastore that we haven't bothered migrating. # # domains = ndb.StringProperty(repeated=True) # DEPRECATED; replaced by :attr:`users`, :attr:`notify`, :attr:`feed` # # labels = ndb.StringProperty(repeated=True, # choices=('activity', 'feed', 'notification', 'user')) @property def as1(self): from protocol import Protocol def use_urls_as_ids(obj): """If id field is missing or not a URL, use the url field.""" id = obj.get('id') if not id or not (util.is_web(id) or DOMAIN_RE.fullmatch(id)): if url := util.get_url(obj): obj['id'] = url for field in 'author', 'actor', 'object': if inner := as1.get_object(obj, field): use_urls_as_ids(inner) if self.our_as1: obj = self.our_as1 if self.source_protocol == 'web': use_urls_as_ids(obj) elif self.as2: obj = as2.to_as1(unwrap(self.as2)) elif self.bsky: owner, _, _ = parse_at_uri(self.key.id()) ATProto = PROTOCOLS['atproto'] handle = ATProto(id=owner).handle try: obj = bluesky.to_as1(self.bsky, repo_did=owner, repo_handle=handle, uri=self.key.id(), pds=ATProto.pds_for(self)) except (ValueError, RequestException): logger.info(f"Couldn't convert to AS1", exc_info=True) return None elif self.mf2: obj = microformats2.json_to_object(self.mf2, rel_urls=self.mf2.get('rel-urls')) use_urls_as_ids(obj) # use fetched final URL as id, not u-url # https://github.com/snarfed/bridgy-fed/issues/829 if url := self.mf2.get('url'): obj['id'] = (self.key.id() if self.key and '#' in self.key.id() else url) if self.key and (proto := Protocol.for_bridgy_subdomain(self.key.id())): if util.domain_or_parent_in(as1.get_owner(obj), BLOG_REDIRECT_DOMAINS): logger.debug(f'overriding actor/author with {proto.bot_user_id()}') obj['actor'] = obj['author'] = proto.bot_user_id() if util.domain_or_parent_in(obj.get('id'), BLOG_REDIRECT_DOMAINS): logger.debug(f'overriding id/url with {self.key.id()}') obj['id'] = obj['url'] = self.key.id() elif self.nostr: obj = granary.nostr.to_as1(self.nostr) elif self.farcaster: msgs = [Message.FromString(b) for b in self.farcaster] obj = granary.farcaster.to_as1(MessagesResponse(messages=msgs) if len(msgs) > 1 else msgs[0]) else: return None # populate id if necessary if self.key: obj.setdefault('id', self.key.id()) if util.domain_or_parent_in(obj.get('id'), IMAGE_PROXY_DOMAINS): as1.prefix_urls(obj, 'image', IMAGE_PROXY_URL_BASE) if self.extra_as1: obj.update(self.extra_as1) return obj @ndb.ComputedProperty def type(self): # AS1 objectType, or verb if it's an activity if self.as1: return as1.object_type(self.as1) def _expire(self): """Automatically delete most Objects after a while using a TTL policy. https://cloud.google.com/datastore/docs/ttl They recommend not indexing TTL properties: https://cloud.google.com/datastore/docs/ttl#ttl_properties_and_indexes """ now = self.updated or util.now() if self.deleted: return now + timedelta(days=1) elif (self.key.id().startswith('internal:') or self.raw or self.is_csv or self.type in DONT_EXPIRE_OBJECT_TYPES or self.copies): return None elif self.type in EXPIRE_LATE_OBJECT_TYPES: return now + OBJECT_LATE_EXPIRE_AGE return now + OBJECT_EARLY_EXPIRE_AGE expire = ndb.ComputedProperty(_expire, indexed=False) def _pre_put_hook(self): """ * Validate that at:// URIs have DIDs * Validate that Nostr ids are nostr:[hex] ids * Validate that multi-element farcaster lists are all USER_DATA_ADD messages * Set/remove the activity label * Strip @context from as2 (we don't do LD) to save disk space """ if len(self.farcaster) > 1: for msg in self.farcaster: data = Message.FromString(msg).data assert data.type == MESSAGE_TYPE_USER_DATA_ADD, f'multi-element farcaster lists must be all USER_DATA_ADD messages; got {data}' if self.as2: self.as2.pop('@context', None) for field in 'actor', 'attributedTo', 'author', 'object': for val in util.get_list(self.as2, field): if isinstance(val, dict): val.pop('@context', None) def check_id(id, proto): if proto in (None, 'ui'): return assert PROTOCOLS[proto].owns_id(id) is not False, \ f'Protocol {PROTOCOLS[proto].LABEL} does not own id {id}' if proto == 'nostr': assert id.startswith('nostr:'), id assert granary.nostr.ID_RE.match(id.removeprefix('nostr:')), id elif proto == 'atproto': assert id.startswith('at://') or id.startswith('did:'), id if id.startswith('at://'): repo, _, _ = parse_at_uri(id) if not repo.startswith('did:'): # TODO: if we hit this, that means the AppView gave us an AT # URI with a handle repo/authority instead of DID. that's # surprising! ...if so, and if we need to handle it, add a # new arroba.did.canonicalize_at_uri() function, then use it # here, or before. raise ValueError(f'at:// URI ids must have DID repos; got {id}') check_id(self.key.id(), self.source_protocol) for target in self.copies: check_id(target.uri, target.protocol) def _post_put_hook(self, future): # TODO: assert that as1 id is same as key id? in pre put hook? logger.debug(f'Wrote {self.key}')
[docs] @classmethod def get_by_id(cls, id, authed_as=None, **kwargs): """Fetches the :class:`Object` with the given id, if it exists. Args: id (str) authed_as (str): optional; if provided, and a matching :class:`Object` already exists, its ``author`` or ``actor`` must contain this actor id. Implements basic authorization for updates and deletes. Returns: Object: Raises: :class:`werkzeug.exceptions.Forbidden` if ``authed_as`` doesn't match the existing object """ obj = super().get_by_id(maybe_truncate_key_id(id), **kwargs) if obj and obj.as1 and authed_as: # authorization: check that the authed user is allowed to modify # this object # https://www.w3.org/wiki/ActivityPub/Primer/Authentication_Authorization proto = obj.owner_protocol() assert proto, obj.source_protocol owners = [ids.normalize_user_id(id=owner, proto=proto) for owner in (as1.get_ids(obj.as1, 'author') + as1.get_ids(obj.as1, 'actor')) if owner] if obj.type in as1.ACTOR_TYPES: owners.append(id) user_id = ids.normalize_user_id(id=authed_as, proto=proto) profile_id = ids.profile_id(id=authed_as, proto=proto) if (owners and user_id not in owners and profile_id not in owners and authed_as not in (PRIMARY_DOMAIN,) + PROTOCOL_DOMAINS): report_error("Auth: Object: authed_as doesn't match owner", user=f'{user_id} {profile_id} authed_as {authed_as} owners {owners}') error(f"authed user {authed_as} ({user_id} {profile_id}) isn't object owner {owners}", status=403) return obj
[docs] @classmethod def get_or_create(cls, id, authed_as=None, **props): """Returns an :class:`Object` with the given property values. If a matching :class:`Object` doesn't exist in the datastore, creates it first. Only populates non-False/empty property values in props into the object. Also populates the :attr:`new` and :attr:`changed` properties. Not transactional because transactions don't read or write memcache. :/ Fortunately we don't really depend on atomicity for much, last writer wins is usually fine. Args: authed_as (str): optional; if provided, and a matching :class:`Object` already exists, its ``author`` or ``actor`` must contain this actor id. Implements basic authorization for updates and deletes. Returns: Object: Raises: :class:`werkzeug.exceptions.Forbidden` if ``authed_as`` doesn't match the existing object """ key_id = maybe_truncate_key_id(id) obj = cls.get_by_id(key_id, authed_as=authed_as) if not obj: obj = Object(id=key_id, **props) obj.new = True obj.changed = False obj.put() return obj if orig_as1 := obj.as1: # get_by_id() checks authorization if authed_as is set. make sure # it's always set for existing objects. assert authed_as dirty = False for prop, val in props.items(): assert not isinstance(getattr(Object, prop), ndb.ComputedProperty) if prop in ('copies', 'feed', 'notify', 'users'): # merge repeated fields for elem in val: if obj.add(prop, elem): dirty = True elif val is not None and val != getattr(obj, prop): setattr(obj, prop, val) if (prop in ('as2', 'bsky', 'csv', 'mf2', 'nostr', 'raw') and not props.get('our_as1')): obj.our_as1 = None dirty = True obj.new = False obj.changed = obj.activity_changed(orig_as1) if dirty: obj.put() return obj
[docs] @staticmethod def from_request(): """Creates and returns an :class:`Object` from form-encoded JSON parameters. Parameters: obj_id (str): id of :class:`models.Object` to handle *: If ``obj_id`` is unset, all other parameters are properties for a new :class:`models.Object` to handle """ if obj_id := request.form.get('obj_id'): return Object.get_by_id(obj_id) props = {field: request.form.get(field) for field in ('id', 'source_protocol')} for json_prop in 'as2', 'bsky', 'mf2', 'our_as1', 'nostr', 'raw': if val := request.form.get(json_prop): props[json_prop] = json_loads(val) obj = Object(**props) if not obj.key and obj.as1: if id := obj.as1.get('id'): obj.key = ndb.Key(Object, id) return obj
[docs] def to_request(self): """Returns a query parameter dict representing this :class:`Object`.""" form = {} for json_prop in 'as2', 'bsky', 'mf2', 'nostr', 'our_as1', 'raw': if val := getattr(self, json_prop, None): form[json_prop] = json_dumps(val, sort_keys=True) for prop in ['source_protocol']: if val := getattr(self, prop): form[prop] = val if self.key: form['id'] = self.key.id() return form
[docs] def activity_changed(self, other_as1): """Returns True if this activity is meaningfully changed from ``other_as1``. ...otherwise False. Used to populate :attr:`changed`. Args: other_as1 (dict): AS1 object, or none """ # ignore inReplyTo since we translate it between protocols return (as1.activity_changed(self.as1, other_as1, inReplyTo=False) if self.as1 and other_as1 else bool(self.as1) != bool(other_as1))
[docs] def get_copy(self, proto): """Returns the id for the copy of this object in a given protocol. ...or None if no such copy exists. If ``proto`` is ``source_protocol``, returns this object's key id. TODO: for some protocols, we should try harder to find the *right* copy id. Eg if if copies has some old garbage entries for this protocol, and we can tell that they don't belong to the user's copy account in this protocol, eg if the DID in the at:// URI doesn't match, we should skip those and look for the matching copy. We'd need the user here though. This would help with or fix: https://console.cloud.google.com/errors/detail/COK22a6w4O2JVg;locations=global;time=P30D?project=bridgy-federated Args: proto: :class:`Protocol` subclass Returns: str: """ copies = self.get_copies(proto) return copies[0] if copies else None
[docs] def get_copies(self, proto): """Returns all ids of copies of this object in a given protocol. If ``proto`` is ``source_protocol``, returns this object's key id. Args: proto: :class:`Protocol` subclass Returns: list of str: """ if self.source_protocol in (proto.LABEL, proto.ABBREV): return [self.key.id()] return [copy.uri for copy in self.copies if copy.protocol in (proto.LABEL, proto.ABBREV)]
[docs] def resolve_ids(self): """Replaces "copy" ids, subdomain ids, etc with their originals. The end result is that all ids are original "source" ids, ie in the protocol that they first came from. Specifically, resolves: * ids in :class:`User.copies` and :class:`Object.copies`, eg ATProto records and Nostr events that we bridged, to the ids of their original objects in their source protocol, eg ``at://did:plc:abc/app.bsky.feed.post/123`` => ``https://mas.to/@user/456``. * Bridgy Fed subdomain URLs to the ids embedded inside them, eg ``https://bsky.brid.gy/ap/did:plc:xyz`` => ``did:plc:xyz`` * ATProto bsky.app URLs to their DIDs or `at://` URIs, eg ``https://bsky.app/profile/a.com`` => ``did:plc:123`` ...in these AS1 fields, in place: * ``id`` * ``actor`` * ``author`` * ``object`` * ``object.actor`` * ``object.author`` * ``object.id`` * ``object.inReplyTo`` * ``attachments.[objectType=note].id`` * ``tags.[objectType=mention].url`` :meth:`protocol.Protocol.translate_ids` is partly the inverse of this. Much of the same logic is duplicated there! TODO: unify with :meth:`normalize_ids`, :meth:`Object.normalize_ids`. """ if not self.as1: return # extract ids, strip Bridgy Fed subdomain URLs outer_obj = unwrap(self.as1) if outer_obj != self.as1: self.our_as1 = util.trim_nulls(outer_obj) self_proto = PROTOCOLS.get(self.source_protocol) if not self_proto: return logger.debug(f'Resolving ids for {self.key.id()}') inner_obj = outer_obj['object'] = as1.get_object(outer_obj) replaced = False def replace(val, orig_fn): id = val.get('id') if isinstance(val, dict) else val if not id or not self_proto.HAS_COPIES: return id orig = orig_fn(id) if not orig: return val nonlocal replaced replaced = True logger.debug(f'Resolved copy id {val} to original {orig.id()}') if isinstance(val, dict) and util.trim_nulls(val).keys() > {'id'}: val['id'] = orig.id() return val else: return orig.id() # actually replace ids # # object field could be either object (eg repost) or actor (eg follow) # TODO: handle better # https://github.com/snarfed/bridgy-fed/issues/2281 outer_obj['object'] = replace(inner_obj, get_original_object_key) if not replaced: outer_obj['object'] = replace(inner_obj, get_original_user_key) for obj in outer_obj, inner_obj: for tag in as1.get_objects(obj, 'tags'): if tag.get('objectType') == 'mention': tag['url'] = replace(tag.get('url'), get_original_user_key) for att in as1.get_objects(obj, 'attachments'): if att.get('objectType') == 'note': att['id'] = replace(att.get('id'), get_original_object_key) for field, fn in ( ('actor', get_original_user_key), ('author', get_original_user_key), ('inReplyTo', get_original_object_key), ): obj[field] = [replace(val, fn) for val in util.get_list(obj, field)] if len(obj[field]) == 1: obj[field] = obj[field][0] if replaced: self.our_as1 = util.trim_nulls(outer_obj)
[docs] def normalize_ids(self): """Normalizes ids to their protocol's canonical representation, if any. For example, normalizes ATProto ``https://bsky.app/...`` URLs to DIDs for profiles, ``at://`` URIs for posts. Modifies this object in place. TODO: unify with :meth:`resolve_ids`, :meth:`Protocol.translate_ids`. """ from protocol import Protocol if not self.as1: return logger.debug(f'Normalizing ids for {self.key.id()}') outer_obj = copy.deepcopy(self.as1) inner_objs = as1.get_objects(outer_obj) replaced = False def replace(val, translate_fn): nonlocal replaced orig = val.get('id') if isinstance(val, dict) else val if not orig: return val proto = Protocol.for_id(orig, remote=False) if not proto: return val translated = translate_fn(id=orig, from_=proto, to=proto) if translated and translated != orig: # logger.debug(f'Normalized {proto.LABEL} id {orig} to {translated}') replaced = True if isinstance(val, dict): val['id'] = translated return val else: return translated return val # actually replace ids for obj in [outer_obj] + inner_objs: for tag in as1.get_objects(obj, 'tags'): if tag.get('objectType') == 'mention': tag['url'] = replace(tag.get('url'), ids.translate_user_id) for field in ['actor', 'author', 'inReplyTo']: fn = (ids.translate_object_id if field == 'inReplyTo' else ids.translate_user_id) obj[field] = [replace(val, fn) for val in util.get_list(obj, field)] if len(obj[field]) == 1: obj[field] = obj[field][0] outer_obj['object'] = [] for inner_obj in inner_objs: translate_fn = ids.translate_object_id if as1.object_type(outer_obj) in as1.VERBS_WITH_ACTOR_OBJECT: translate_fn = ids.translate_user_id got = replace(inner_obj, translate_fn) if isinstance(got, dict) and util.trim_nulls(got).keys() == {'id'}: got = got['id'] outer_obj['object'].append(got) if len(outer_obj['object']) == 1: outer_obj['object'] = outer_obj['object'][0] if replaced: self.our_as1 = util.trim_nulls(outer_obj)
[docs] def owner_protocol(self): """Wrapper around :attr:`source_protocol` that handles :class:`UIProtocol`. Returns: Protocol subclass: :attr:`source_protocol` *unless* it's None or :class:`UIProtocol`, in which case infers and returns ``author``'s or ``actor``'s protocol instead. """ from protocol import Protocol if self.source_protocol in (None, 'ui'): return Protocol.for_id(as1.get_owner(self.as1)) return PROTOCOLS.get(self.source_protocol)
@cached_property def domain_blocklist(self): """Returns the domains in the domain blocklist in :attr:`raw` or :attr:`csv`. If :attr:`raw` is a list, returns it directly. Otherwise extracts the 'domain' or '#domain' column from :attr:`csv`. TODO: unify with :meth:`filters.blocklist_items` Returns: list of str: domain names, or empty list if neither :attr:`raw` nor :attr:`csv` is populated or parseable. """ assert not (self.raw and self.csv) if self.raw: return [val.split('#')[0].strip().lower() for val in self.raw] if not self.csv: return [] try: reader = csv.DictReader(io.StringIO(self.csv)) except csv.Error: return [] if 'domain' in reader.fieldnames: col = 'domain' elif '#domain' in reader.fieldnames: col = '#domain' else: return [] return [row[col] for row in reader if row[col] and row[col] not in DOMAIN_BLOCKLIST_CANARIES]
[docs] def domain_blocklist_matches(self, user_or_id): """Returns True if ``user_or_id`` is in this domain blocklist, False otherwise. For users, looks at id, handle, and delivery target. Args: user_or_id (User or str) Returns: bool: Raises: AssertionError: if this object is not a domain blocklist """ assert self.is_csv or self.csv or isinstance(self.raw, list) if isinstance(user_or_id, User): user = user_or_id inputs = [user.key.id(), user.handle_as_domain] if user.obj: inputs.append(user.target_for(user.obj)) else: inputs = [user_or_id] for input in inputs: if domain := util.domain_from_link(input): if (util.domain_or_parent_in(domain, self.domain_blocklist) and not util.domain_or_parent_in(domain, domains.DOMAINS)): logger.info(f'{input} matches domain blocklist {self.key.id()}') return True
[docs] class Follower(ndb.Model): """A follower of a Bridgy Fed user.""" STATUSES = ('active', 'inactive', 'dormant') REASONS = ('requested', 'bounce') from_ = ndb.KeyProperty(name='from', required=True) """The follower.""" to = ndb.KeyProperty(required=True) """The followee, ie the user being followed.""" follow = ndb.KeyProperty(Object) """The last follow activity.""" status = ndb.StringProperty(choices=STATUSES, default='active') """Whether this follow is active or not. ``dormant`` means the followee isn't bridged (yet), so the follow can't be delivered. If they enable the bridge, we notify the follower. """ reason = ndb.StringProperty(choices=REASONS) """Optional explanation for this follow's :attr:`status`, eg why it's dormant. One of :attr:`REASONS`.""" created = ndb.DateTimeProperty(auto_now_add=True) updated = ndb.DateTimeProperty(auto_now=True) # OLD. some stored entities still have these; do not reuse. # src = ndb.StringProperty() # dest = ndb.StringProperty() # last_follow = JsonProperty() def _pre_put_hook(self): # we're a bridge! stick with bridging. assert self.from_.kind() != self.to.kind(), f'from {self.from_} to {self.to}' def _post_put_hook(self, future): logger.debug(f'Wrote {self.key}')
[docs] @classmethod def get_or_create(cls, *, from_, to, **kwargs): """Returns a Follower with the given ``from_`` and ``to`` users. Not transactional because transactions don't read or write memcache. :/ Fortunately we don't really depend on atomicity for much, last writer wins is usually fine. If a matching :class:`Follower` doesn't exist in the datastore, creates it first. If ``status='dormant'`` is passed and the existing Follower is ``active``, the existing Follower is returned unchanged: we never downgrade an active Follower to dormant. Args: from_ (User or Key) to (User or Key) Returns: Follower: """ from_key = from_ if isinstance(from_, ndb.Key) else from_.key to_key = to if isinstance(to, ndb.Key) else to.key assert from_key assert to_key follower = Follower.query(Follower.from_ == from_key, Follower.to == to_key, ).get() if not follower: follower = Follower(from_=from_key, to=to_key, **kwargs) elif kwargs: # update existing entity with new property values, eg to make an # inactive Follower active again for prop, val in kwargs.items(): if (prop == 'status' and val == 'dormant' and follower.status == 'active'): # don't downgrade an active Follower to dormant continue setattr(follower, prop, val) follower.put() return follower
[docs] @staticmethod def fetch_page(collection, user): r"""Fetches a page of :class:`Follower`\s for a given user. Wraps :func:`fetch_page`. Paging uses the ``before`` and ``after`` query parameters, if available in the request. Args: collection (str): ``followers`` or ``following`` user (User) Returns: (list of Follower, str, str) tuple: results, annotated with an extra ``user`` attribute that holds the follower or following :class:`User`, and new str query param values for ``before`` and ``after`` to fetch the previous and next pages, respectively """ assert collection in ('followers', 'following'), collection filter_prop = Follower.to if collection == 'followers' else Follower.from_ query = Follower.query( Follower.status == 'active', filter_prop == user.key, ) followers, before, after = fetch_page(query, Follower, by=Follower.updated) users = ndb.get_multi(f.from_ if collection == 'followers' else f.to for f in followers) User.load_multi(u for u in users if u) for f, u in zip(followers, users): f.user = u followers = [f for f in followers if f.user] # only show followers in protocols that this user is bridged into if collection == 'followers': followers = [f for f in followers if user.is_enabled(f.user)] return followers, before, after
[docs] def fetch_objects(query, by=None, user=None, max_age=None): """Fetches a page of :class:`Object` entities from a datastore query. Wraps :func:`fetch_page` and adds attributes to the returned :class:`Object` entities for rendering in ``objects.html``. Args: query (ndb.Query) by (ndb.model.Property): either :attr:`Object.updated` or :attr:`Object.created` user (User): current user max_age (datetime.timedelta): passed through to :func:`fetch_page` Returns: (list of Object, str, str) tuple: (results, new ``before`` query param, new ``after`` query param) to fetch the previous and next pages, respectively """ assert by is Object.updated or by is Object.created objects, new_before, new_after = fetch_page(query, Object, by=by, max_age=max_age) objects = [o for o in objects if as1.is_public(o.as1) and not o.deleted] # synthesize human-friendly content for objects for i, obj in enumerate(objects): obj_as1 = obj.as1 type = as1.object_type(obj_as1) # AS1 verb => human-readable phrase phrases = { 'accept': 'accepted', 'article': 'posted', 'comment': 'replied', 'delete': 'deleted', 'follow': 'followed', 'invite': 'is invited to', 'issue': 'filed issue', 'like': 'liked', 'note': 'posted', 'post': 'posted', 'repost': 'reposted', 'rsvp-interested': 'is interested in', 'rsvp-maybe': 'might attend', 'rsvp-no': 'is not attending', 'rsvp-yes': 'is attending', 'share': 'reposted', 'stop-following': 'unfollowed', 'undo': 'undid', 'update': 'updated', } phrases.update({type: 'profile refreshed:' for type in as1.ACTOR_TYPES}) obj.phrase = phrases.get(type, '') content = (obj_as1.get('content') or obj_as1.get('displayName') or obj_as1.get('summary')) if content: content = util.parse_html(content).get_text() urls = as1.object_urls(obj_as1) url = urls[0] if urls else None if url and not content: # heuristics for sniffing URLs and converting them to more friendly # phrases and user handles. # TODO: standardize this into granary.as2 somewhere? from activitypub import FEDI_URL_RE from atproto import COLLECTION_TO_TYPE, did_to_handle handle = suffix = '' if match := FEDI_URL_RE.match(url): handle = match.group('handle') if match.group('post_id'): suffix = "'s post" elif match := BSKY_APP_URL_RE.match(url): handle = match.group('id') if match.group('tid'): suffix = "'s post" elif match := AT_URI_RE.match(url): handle = match.group('repo') if coll := match.group('collection'): suffix = f"'s {COLLECTION_TO_TYPE.get(coll) or 'post'}" url = bluesky.at_uri_to_web_url(url) elif url.startswith('did:'): handle = url url = bluesky.Bluesky.user_url(handle) if handle: if handle.startswith('did:'): handle = did_to_handle(handle) or handle content = f'@{handle}{suffix}' if url: content = common.pretty_link(url, text=content, user=user) obj.content = (obj_as1.get('content') or obj_as1.get('displayName') or obj_as1.get('summary')) obj.url = as1.get_url(obj_as1) if type in ('like', 'follow', 'repost', 'share') or not obj.content: inner_as1 = as1.get_object(obj_as1) obj.inner_url = as1.get_url(inner_as1) or inner_as1.get('id') if obj.url: obj.phrase = common.pretty_link( obj.url, text=obj.phrase, attrs={'class': 'u-url'}, user=user) if content: obj.content = content obj.url = url elif obj.inner_url: obj.content = common.pretty_link(obj.inner_url, max_length=50) return objects, new_before, new_after
[docs] def hydrate(activity, fields=('author', 'actor', 'object')): """Hydrates fields in an AS1 activity, in place. Args: activity (dict): AS1 activity fields (sequence of str): names of fields to hydrate. If they're string ids, loads them from the datastore, if possible, and replaces them with their dict AS1 objects. Returns: sequence of :class:`google.cloud.ndb.tasklets.Future`: tasklets for hydrating each field. Wait on these before using ``activity``. """ def _hydrate(field): def maybe_set(future): if future.result() and future.result().as1: activity[field] = future.result().as1 return maybe_set futures = [] for field in fields: val = as1.get_object(activity, field) if val and val.keys() <= set(['id']): # TODO: extract a Protocol class method out of User.profile_id, # then use that here instead. the catch is that we'd need to # determine Protocol for every id, which is expensive. # # same TODO is in models.fetch_objects id = val['id'] if id.startswith('did:'): id = f'at://{id}/app.bsky.actor.profile/self' future = Object.get_by_id_async(id) future.add_done_callback(_hydrate(field)) futures.append(future) return futures
[docs] def fetch_page(query, model_class, by=None, max_age=None): """Fetches a page of results from a datastore query. Uses the ``before`` and ``after`` query params (if provided; should be ISO8601 timestamps) and the ``by`` property to identify the page to fetch. Populates a ``log_url_path`` property on each result entity that points to a its most recent logged request. Args: query (google.cloud.ndb.query.Query) model_class (class) by (ndb.model.Property): paging property, eg :attr:`Object.updated` or :attr:`Object.created` max_age (datetime.timedelta): if provided, reject ``before``/``after`` params older than this, and don't generate paging links past it Returns: (list of Object or Follower, str, str) tuple: (results, new_before, new_after), where new_before and new_after are query param values for ``before`` and ``after`` to fetch the previous and next pages, respectively """ assert by now = util.now().replace(tzinfo=None) # if there's a paging param ('before' or 'after'), update query with it # TODO: unify this with Bridgy's user page def get_paging_param(param): val = request.values.get(param) if val: try: dt = util.parse_iso8601(val.replace(' ', '+')) except BaseException as e: error(f"Couldn't parse {param}, {val!r} as ISO8601: {e}") if dt.tzinfo: dt = dt.astimezone(timezone.utc).replace(tzinfo=None) if max_age and now - dt > max_age: error(f'{param} is too old') return dt before = get_paging_param('before') after = get_paging_param('after') if before and after: error("can't handle both before and after") elif after: query = query.filter(by >= after).order(by) elif before: query = query.filter(by < before).order(-by) else: query = query.order(-by) query_iter = query.iter() results = sorted(itertools.islice(query_iter, 0, PAGE_SIZE), key=lambda r: r.updated, reverse=True) # calculate new paging param(s) has_next = results and query_iter.probably_has_next() new_after = ( before if before else results[0].updated if has_next and after else None) if new_after: new_after = new_after.isoformat() new_before = ( after if after else results[-1].updated if has_next else None) if new_before and max_age and now - new_before > max_age: # don't link to pages older than the max paging age new_before = None if new_before: new_before = new_before.isoformat() return results, new_before, new_after
[docs] def load_user(handle_or_id, proto=None, create=False, allow_opt_out=False, raise_=False): """Loads a user by handle or id. Args: handle_or_id (str): user handle or id proto (Protocol subclass or None): protocol to use. If None, will try to determine protocol via Protocol.for_id and Protocol.for_handle create (bool): if True, use get_or_create; if False, use get_by_id allow_opt_out (bool): whether to return a user if they're currently opted out raise_ (bool): passed through to :meth:`User.reload_profile`. If False, and :meth:`User.reload_profile` returns None when fetching the user's profile, this method raises :class:`RuntimeError` Returns: User: Raises: RuntimeError: if no matching user was found """ import protocol logger.debug(f'loading {handle_or_id}') if not proto or proto is protocol.Protocol: if not (proto := protocol.Protocol.for_id(handle_or_id)): proto, id = protocol.Protocol.for_handle(handle_or_id) if id: handle_or_id = id if not proto: if handle_or_id.startswith('@'): return load_user(handle_or_id.removeprefix('@'), create=create, allow_opt_out=allow_opt_out) raise RuntimeError(f"Couldn't determine network for {handle_or_id}") if proto.owns_id(handle_or_id) is not False: if proto.LABEL == 'web' and util.is_web(handle_or_id): if not util.is_homepage(handle_or_id): raise RuntimeError(f"{handle_or_id} isn't a web domain or homepage URL") # TODO: handle user vs object ids here. this incorrectly assumes that it's # a user id. https://github.com/snarfed/bridgy-fed/issues/2281 id = ids.normalize_user_id(id=handle_or_id, proto=proto) user = (proto.get_or_create(id, allow_opt_out=allow_opt_out, raise_=raise_) if create else proto.get_by_id(id, allow_opt_out=allow_opt_out)) if not user: raise RuntimeError(f"Couldn't load {handle_or_id} on {proto.PHRASE}") return user logger.debug(f"doesn't look like a {proto.LABEL} user ID, trying as a handle") if proto.owns_handle(handle_or_id) is False: if handle_or_id.startswith('@'): return load_user(handle_or_id.removeprefix('@'), create=create, proto=proto, allow_opt_out=allow_opt_out) raise RuntimeError(f"{handle_or_id} doesn't look like a user id or handle on {proto.PHRASE}") for user in proto.query(ndb.OR(proto.handle == handle_or_id, proto.handle_as_domain == handle_or_id)): # some users may have an old handle stored and indexed, but they've changed # their handle since then, so check again in memory if user.handle == handle_or_id or user.handle_as_domain == handle_or_id: if user.use_instead: logger.debug(f'{user.key} use_instead => {user.use_instead}') user = user.use_instead.get() if (not user.status and user.enabled_protocols) or allow_opt_out: return user if create: id = proto.handle_to_id(handle_or_id) if not id: raise RuntimeError(f"{handle_or_id} doesn't look like a handle on {proto.PHRASE}") user = proto.get_or_create(id, allow_opt_out=allow_opt_out, raise_=raise_) if user and user.obj and user.obj.as1: return user raise RuntimeError(f"Couldn't find bridged {proto.LABEL} account {handle_or_id}")
[docs] def maybe_truncate_key_id(id): """Returns id, truncated to ``_MAX_KEYPART_BYTES`` bytes if it's longer.""" encoded = id.encode('utf-8') if len(encoded) > _MAX_KEYPART_BYTES: truncated = encoded[:_MAX_KEYPART_BYTES].decode('utf-8', errors='ignore') logger.warning(f'Truncating id {id} to {_MAX_KEYPART_BYTES} bytes: {truncated}') return truncated return id