Source code for protocol

"""Base protocol class and common code."""
from bs4 import BeautifulSoup
import copy
from datetime import datetime, timedelta, timezone
import logging
import os
import re
from threading import Lock
from urllib.parse import urljoin, urlparse

from cachetools import cached, LRUCache
from flask import request
from google.cloud import ndb
from google.cloud.ndb import OR
from google.cloud.ndb.model import _entity_to_protobuf
from granary import as1, as2, source
from granary.source import HTML_ENTITY_RE, html_to_text
from pymemcache.exceptions import (
    MemcacheServerError,
    MemcacheUnexpectedCloseError,
    MemcacheUnknownError,
)
from requests import RequestException
from websockets.exceptions import InvalidStatus
from webutil.appengine_info import DEBUG
from webutil.flask_util import cloud_tasks_only
from webutil.models import MAX_ENTITY_SIZE
from webutil import util
from webutil.util import json_dumps, json_loads
import werkzeug.exceptions
from werkzeug.exceptions import BadGateway, BadRequest, HTTPException

import common
from common import (
    ErrorButDoNotRetryTask,
    report_error,
)
from domains import (
    DOMAINS,
    LOCAL_DOMAINS,
    PRIMARY_DOMAIN,
    PROTOCOL_DOMAINS,
    SUPERDOMAIN,
)
import dms
from domains import DOMAIN_BLOCKLIST
import ids
import memcache
from models import (
    Follower,
    get_original_user_key,
    load_user,
    Object,
    PROTOCOLS,
    PROTOCOLS_BY_KIND,
    Target,
    User,
)
import notifications

OBJECT_REFRESH_AGE = timedelta(days=30)
DELETE_TASK_DELAY = timedelta(minutes=1)
CREATE_MAX_AGE = timedelta(weeks=2)
CREATE_MAX_AGE_EXEMPT_DOMAINS = (
    'alt.store',
)
# WARNING: keep this below the receive queue's min_backoff_seconds in queue.yaml!
MEMCACHE_LEASE_EXPIRATION = timedelta(seconds=25)
MEMCACHE_DOWN_TASK_DELAY = timedelta(minutes=5)
# WARNING: keep this in sync with queue.yaml's receive and webmention task_retry_limit!
TASK_RETRIES_RECEIVE = 4
# https://docs.cloud.google.com/tasks/docs/creating-appengine-handlers#reading-headers
TASK_RETRIES_HEADER = 'X-AppEngine-TaskRetryCount'

# require a follow for users on these domains before we deliver anything from
# them other than their profile
LIMITED_DOMAINS = (os.getenv('LIMITED_DOMAINS', '').split()
                   or util.load_file_lines('limited_domains'))

# domains to allow non-public activities from
NON_PUBLIC_DOMAINS = (
    # bridged from twitter (X). bird.makeup, kilogram.makeup, etc federate
    # tweets as followers-only, but they're public on twitter itself
    '.makeup',
)

DONT_STORE_AS1_TYPES = as1.CRUD_VERBS | set((
    'accept',
    'reject',
    'stop-following',
    'undo',
))
STORE_AS1_TYPES = (as1.ACTOR_TYPES | as1.POST_TYPES | as1.VERBS_WITH_OBJECT
                   - DONT_STORE_AS1_TYPES)

DONT_NOTIFY_TYPES = (
    'block',
)

logger = logging.getLogger(__name__)


[docs] def error(*args, status=299, **kwargs): """Default HTTP status code to 299 to prevent retrying task.""" return common.error(*args, status=status, **kwargs)
def activity_id_memcache_key(id): return memcache.key(f'receive-{id}')
[docs] class Protocol: """Base protocol class. Not to be instantiated; classmethods only.""" ABBREV = None 'str: lower case abbreviation, used in URL paths' PHRASE = None 'str: human-readable name or phrase. Used in phrases like ``Follow this person on {PHRASE}``' OTHER_LABELS = () 'sequence of str: label aliases' LOGO_EMOJI = '' 'str: logo emoji, if any' LOGO_HTML = '' 'str: logo ``<img>`` tag, if any' CONTENT_TYPE = None "str: MIME type of this protocol's native data format, appropriate for the ``Content-Type`` HTTP header." HAS_COPIES = False 'bool: whether this protocol is push and needs us to proactively create "copy" users and objects, as opposed to pulling converted objects on demand' DEFAULT_TARGET = None 'str: optional, the default target URI to send this protocol\'s activities to. May be used as the "shared" target. Often only set if ``HAS_COPIES`` is true.' REQUIRES_AVATAR = False "bool: whether accounts on this protocol are required to have a profile picture. If they don't, their ``User.status`` will be ``blocked``." REQUIRES_NAME = False "bool: whether accounts on this protocol are required to have a profile name that's different than their handle or id. If they don't, their ``User.status`` will be ``blocked``." REQUIRES_OLD_ACCOUNT = False "bool: whether accounts on this protocol are required to be at least :const:`common.OLD_ACCOUNT_AGE` old. If their profile includes creation date and it's not old enough, their ``User.status`` will be ``blocked``." DEFAULT_ENABLED_PROTOCOLS = () 'sequence of str: labels of other protocols that are automatically enabled for this protocol to bridge into' DEFAULT_SERVE_USER_PAGES = False "bool: whether to serve user pages for all of this protocol's users on the fed.brid.gy. If ``False``, user pages will only be served for users who have explictly opted in." SUPPORTED_AS1_TYPES = () 'sequence of str: AS1 objectTypes and verbs that this protocol supports receiving and sending' SUPPORTS_DMS = False 'bool: whether this protocol can receive DMs (chat messages)' USES_OBJECT_FEED = False 'bool: whether to store followers on this protocol in :attr:`Object.feed`.' HTML_PROFILES = False 'bool: whether this protocol supports HTML in profile descriptions. If False, profile descriptions should be plain text.' SEND_REPLIES_TO_ORIG_POSTS_MENTIONS = False "bool: whether replies to this protocol should include the original post's mentions as delivery targets" BOTS_FOLLOW_BACK = False 'bool: when a user on this protocol follows a bot user to enable bridging, does the bot follow them back?' HANDLES_PER_PAY_LEVEL_DOMAIN = None 'int: how many users to allow with handles on the same pay-level domain. None for no limit.' RECEIVE_FILTERS = () 'tuple of callable: filter functions from filters.py to apply to incoming activities. Applied in order, so put the cheapest filters first.' RATE_LIMIT_TYPE = memcache.RateLimitType.LINEAR 'Whether receive and send task rate limiting increases linearly or exponential.' @classmethod @property def LABEL(cls): """str: human-readable lower case name of this protocol, eg ``'activitypub``""" return cls.__name__.lower()
[docs] @staticmethod def for_request(fed=None): """Returns the protocol for the current request. ...based on the request's hostname. Args: fed (str or protocol.Protocol): protocol to return if the current request is on ``fed.brid.gy`` Returns: Protocol: protocol, or None if the provided domain or request hostname domain is not a subdomain of ``brid.gy`` or isn't a known protocol """ return Protocol.for_bridgy_subdomain(request.host, fed=fed)
[docs] @staticmethod def for_bridgy_subdomain(domain_or_url, fed=None): """Returns the protocol for a brid.gy subdomain. Args: domain_or_url (str) fed (str or protocol.Protocol): protocol to return if the current request is on ``fed.brid.gy`` Returns: class: :class:`Protocol` subclass, or None if the provided domain or request hostname domain is not a subdomain of ``brid.gy`` or isn't a known protocol """ domain = (util.domain_from_link(domain_or_url, minimize=False) if util.is_web(domain_or_url) else domain_or_url) if domain == PRIMARY_DOMAIN or domain in LOCAL_DOMAINS: return PROTOCOLS[fed] if isinstance(fed, str) else fed elif domain and domain.endswith(SUPERDOMAIN): label = domain.removesuffix(SUPERDOMAIN) return PROTOCOLS.get(label)
[docs] @classmethod def owns_id(cls, id): """Returns whether this protocol owns the id, or None if it's unclear. To be implemented by subclasses. IDs are string identities that uniquely identify users or objects, and are intended primarily to be machine readable and usable. Compare to handles, which are human-chosen, human-meaningful, and often but not always unique. Some protocols' ids are more or less deterministic based on the id format, eg AT Protocol owns ``at://`` URIs and DIDs. Others, like http(s) URLs, could be owned by eg Web or ActivityPub. This should be a quick guess without expensive side effects, eg no external HTTP fetches to fetch the id itself or otherwise perform discovery. Returns False if the id's domain is in :const:`domains.DOMAIN_BLOCKLIST`. Args: id (str): user id or object id Returns: bool or None: """ return False
[docs] @classmethod def owns_handle(cls, handle, allow_internal=False): """Returns whether this protocol owns the handle, or None if it's unclear. To be implemented by subclasses. Handles are string identities that are human-chosen, human-meaningful, and often but not always unique. Compare to IDs, which uniquely identify users, and are intended primarily to be machine readable and usable. Some protocols' handles are more or less deterministic based on the id format, eg ActivityPub (technically WebFinger) handles are ``@user@instance.com``. Others, like domains, could be owned by eg Web, ActivityPub, AT Protocol, or others. This should be a quick guess without expensive side effects, eg no external HTTP fetches to fetch the id itself or otherwise perform discovery. Args: handle (str) allow_internal (bool): whether to return False for internal domains like ``fed.brid.gy``, ``bsky.brid.gy``, etc Returns: bool or None """ return False
[docs] @classmethod def handle_to_id(cls, handle): """Converts a handle to an id. To be implemented by subclasses. May incur network requests, eg DNS queries or HTTP requests. Avoids blocked or opted out users. Args: handle (str) Returns: str: corresponding id, or None if the handle can't be found """ raise NotImplementedError()
[docs] @classmethod def authed_user_for_request(cls): """Returns the authenticated user id for the current request. Checks authentication on the current request, eg HTTP Signature for ActivityPub. To be implemented by subclasses. Returns: str: authenticated user id, or None if there is no authentication Raises: RuntimeError: if the request's authentication (eg signature) is invalid or otherwise can't be verified """ return None
[docs] @classmethod def key_for(cls, id, allow_opt_out=False): """Returns the :class:`google.cloud.ndb.Key` for a given id's :class:`models.User`. If called via `Protocol.key_for`, infers the appropriate protocol with :meth:`for_id`. If called with a concrete subclass, uses that subclass as is. Args: id (str): allow_opt_out (bool): whether to allow users who are currently opted out Returns: google.cloud.ndb.Key: matching key, or None if the given id is not a valid :class:`User` id for this protocol. """ if cls == Protocol: proto = Protocol.for_id(id) return proto.key_for(id, allow_opt_out=allow_opt_out) if proto else None # load user so that we follow use_instead existing = cls.get_by_id(id, allow_opt_out=True) if existing: if existing.status and not allow_opt_out: return None return existing.key return cls(id=id).key
@staticmethod def _for_id_memcache_key(id, remote=None): """If id is a URL, uses its domain, otherwise returns None. Args: id (str) Returns: (str domain, bool remote) or None """ domain = util.domain_from_link(id) if domain in PROTOCOL_DOMAINS: return id elif remote and util.is_web(id): return domain
[docs] @cached(LRUCache(20000), lock=Lock()) @memcache.memoize(key=_for_id_memcache_key, write=lambda id, remote=True: remote, version=3) @staticmethod def for_id(id, remote=True): """Returns the protocol for a given id. Args: id (str) remote (bool): whether to perform expensive side effects like fetching the id itself over the network, or other discovery. Returns: Protocol subclass: matching protocol, or None if no single known protocol definitively owns this id """ logger.debug(f'Determining protocol for id {id}') if not id: return None # remove our synthetic id fragment, if any # # will this eventually cause false positives for other services that # include our full ids inside their own ids, non-URL-encoded? guess # we'll figure that out if/when it happens. id = id.partition('#bridgy-fed-')[0] if not id: return None if util.is_web(id): # step 1: check for our per-protocol subdomains try: parsed = urlparse(id) except ValueError as e: logger.info(f'urlparse ValueError: {e}') return None is_internal = parsed.path.startswith(ids.INTERNAL_PATH_PREFIX) by_subdomain = Protocol.for_bridgy_subdomain(id) if by_subdomain and not (util.is_homepage(id) or is_internal or id in ids.BOT_ACTOR_AP_IDS): logger.debug(f' {by_subdomain.LABEL} owns id {id}') return by_subdomain # step 2: check if any Protocols say conclusively that they own it # sort to be deterministic protocols = sorted(set(p for p in PROTOCOLS.values() if p), key=lambda p: p.LABEL) candidates = [] for protocol in protocols: owns = protocol.owns_id(id) if owns: logger.debug(f' {protocol.LABEL} owns id {id}') return protocol elif owns is not False: candidates.append(protocol) if len(candidates) == 1: logger.debug(f' {candidates[0].LABEL} owns id {id}') return candidates[0] # step 3: look for existing Objects in the datastore # # note that we don't currently see if this is a copy id because I have FUD # over which Protocol for_id should return in that case...and also because a # protocol may already say definitively above that it owns the id, eg ATProto # with DIDs and at:// URIs. obj = Protocol.load(id, remote=False) if obj and obj.source_protocol: logger.debug(f' {obj.key.id()} owned by source_protocol {obj.source_protocol}') return PROTOCOLS[obj.source_protocol] # step 4: fetch over the network, if necessary if not remote: return None for protocol in candidates: logger.debug(f'Trying {protocol.LABEL}') try: obj = protocol.load(id, local=False, remote=True) if protocol.ABBREV == 'web': # for web, if we fetch and get HTML without microformats, # load returns False but the object will be stored in the # datastore with source_protocol web, and in cache. load it # again manually to check for that. obj = Object.get_by_id(id) if obj and obj.source_protocol != 'web': obj = None if obj: logger.debug(f' {protocol.LABEL} owns id {id}') return protocol except BadGateway: # we tried and failed fetching the id over the network. # this depends on ActivityPub.fetch raising this! return None except HTTPException as e: # internal error we generated ourselves; try next protocol pass except Exception as e: code, _ = util.interpret_http_exception(e) if code: # we tried and failed fetching the id over the network return None raise logger.info(f'No matching protocol found for {id} !') return None
[docs] @cached(LRUCache(20000), lock=Lock()) @staticmethod def for_handle(handle): """Returns the protocol for a given handle. May incur expensive side effects like resolving the handle itself over the network or other discovery. Args: handle (str) Returns: (Protocol subclass, str) tuple: matching protocol and optional id (if resolved), or ``(None, None)`` if no known protocol owns this handle """ # TODO: normalize, eg convert domains to lower case logger.debug(f'Determining protocol for handle {handle}') if not handle: return (None, None) # step 1: check if any Protocols say conclusively that they own it. # sort to be deterministic. protocols = sorted(set(p for p in PROTOCOLS.values() if p), key=lambda p: p.LABEL) candidates = [] for proto in protocols: owns = proto.owns_handle(handle) if owns: logger.debug(f' {proto.LABEL} owns handle {handle}') return (proto, None) elif owns is not False: candidates.append(proto) if len(candidates) == 1: logger.debug(f' {candidates[0].LABEL} owns handle {handle}') return (candidates[0], None) # step 2: look for matching User in the datastore for proto in candidates: user = proto.query(proto.handle == handle).get() if user: if user.status: return (None, None) logger.debug(f' user {user.key} handle {handle}') return (proto, user.key.id()) # step 3: resolve handle to id for proto in candidates: id = proto.handle_to_id(handle) if id: logger.debug(f' {proto.LABEL} resolved handle {handle} to id {id}') return (proto, id) logger.info(f'No matching protocol found for handle {handle} !') return (None, None)
[docs] @classmethod def is_user_at_domain(cls, handle, allow_internal=False): """Returns True if handle is formatted ``user@domain.tld``, False otherwise. Example: ``@user@instance.com`` Args: handle (str) allow_internal (bool): whether the domain can be a Bridgy Fed domain """ parts = handle.split('@') if len(parts) != 2: return False user, domain = parts return bool(user and domain and not cls.is_blocklisted(domain, allow_internal=allow_internal))
[docs] @classmethod def bridged_web_url_for(cls, user, fallback=False): """Returns the web URL for a user's bridged profile in this protocol. For example, for Web user ``alice.com``, :meth:`ATProto.bridged_web_url_for` returns ``https://bsky.app/profile/alice.com.web.brid.gy`` Args: user (models.User) fallback (bool): if True, and bridged users have no canonical user profile URL in this protocol, return the native protocol's profile URL Returns: str, or None if there isn't a canonical URL """ if fallback: return user.web_url()
[docs] @classmethod def actor_key(cls, obj, allow_opt_out=False): """Returns the :class:`User`: key for a given object's author or actor. Args: obj (models.Object) allow_opt_out (bool): whether to return a user key if they're opted out Returns: google.cloud.ndb.key.Key or None: """ owner = as1.get_owner(obj.as1) if owner: return cls.key_for(owner, allow_opt_out=allow_opt_out)
[docs] @classmethod def bot_user_id(cls): """Returns the Web user id for the bot user for this protocol. For example, ``'bsky.brid.gy'`` for ATProto. Returns: str: """ return f'{cls.ABBREV}{SUPERDOMAIN}'
[docs] @classmethod def create_for(cls, user): """Creates or re-activate a copy user in this protocol. Should add the copy user to :attr:`copies`. If the copy user already exists and active, should do nothing. Args: user (models.User): original source user. Shouldn't already have a copy user for this protocol in :attr:`copies`. Raises: ValueError: if we can't create a copy of the given user in this protocol """ raise NotImplementedError()
[docs] @classmethod def send(to_cls, obj, target, from_user=None, orig_obj_id=None): """Sends an outgoing activity. To be implemented by subclasses. Should call ``to_cls.translate_ids(obj.as1)`` before converting it to this Protocol's format. NOTE: if this protocol's ``HAS_COPIES`` is True, and this method creates a copy and sends it, it *must* add that copy to the *object*'s (not activity's) :attr:`copies`, and store it back in the datastore, *in a transaction*! Args: obj (models.Object): with activity to send target (str): destination URL to send to from_user (models.User): user (actor) this activity is from orig_obj_id (str): :class:`models.Object` key id of the "original object" that this object refers to, eg replies to or reposts or likes Returns: bool: True if the activity is sent successfully, False if it is ignored or otherwise unsent due to protocol logic, eg no webmention endpoint, protocol doesn't support the activity type. (Failures are raised as exceptions.) Raises: werkzeug.HTTPException if the request fails """ raise NotImplementedError()
[docs] @classmethod def fetch(cls, obj, **kwargs): """Fetches a protocol-specific object and populates it in an :class:`Object`. Errors are raised as exceptions. If this method returns False, the fetch didn't fail but didn't succeed either, eg the id isn't valid for this protocol, or the fetch didn't return valid data for this protocol. To be implemented by subclasses. Args: obj (models.Object): with the id to fetch. Data is filled into one of the protocol-specific properties, eg ``as2``, ``mf2``, ``bsky``. kwargs: subclass-specific Returns: bool: True if the object was fetched and populated successfully, False otherwise Raises: requests.RequestException, werkzeug.HTTPException, websockets.WebSocketException, etc: if the fetch fails """ raise NotImplementedError()
[docs] @classmethod def convert(cls, obj, from_user=None, **kwargs): """Converts an :class:`Object` to this protocol's data format. For example, an HTML string for :class:`Web`, or a dict with AS2 JSON and ``application/activity+json`` for :class:`ActivityPub`. Just passes through to :meth:`_convert`, then does minor protocol-independent postprocessing. Args: obj (models.Object): from_user (models.User): user (actor) this activity/object is from kwargs: protocol-specific, passed through to :meth:`_convert` Returns: converted object in the protocol's native format, often a dict, or None """ if not obj or not obj.as1: return None id = obj.key.id() if obj.key else obj.as1.get('id') is_crud = obj.as1.get('verb') in as1.CRUD_VERBS base_obj = as1.get_object(obj.as1) if is_crud else obj.as1 orig_our_as1 = obj.our_as1 # post-processing for user profiles if (from_user and from_user.is_profile(obj) and PROTOCOLS.get(obj.source_protocol) != cls and Protocol.for_bridgy_subdomain(id) not in DOMAINS): # TODO: more systematic way to get this that covers all protocols, # eg Nostr NIP-05 web_opted_in = (from_user.LABEL == 'web' and (from_user.last_webmention_in or from_user.has_redirects or from_user.handle_as('atproto') == from_user.key.id())) if not web_opted_in: # mark bridged actors as bots and add "bridged by Bridgy Fed" to # their bios. (web users are special cased, they don't get the label # if they've explicitly enabled Bridgy Fed with redirects or # webmentions.) cls.add_source_links(obj=obj, from_user=from_user) # web is currently opt out, so add [Unofficial] to their display name # to be explicit that they may not have enabled this themselves if from_user.LABEL == 'web': if obj.our_as1 is orig_our_as1: obj.our_as1 = copy.deepcopy(obj.as1) actor = as1.get_object(obj.our_as1) if is_crud else obj.our_as1 if ((name := actor.get('displayName')) and not name.endswith(' [Unofficial]')): actor['displayName'] = f'{name} [Unofficial]' converted = cls._convert(obj, from_user=from_user, **kwargs) obj.our_as1 = orig_our_as1 return converted
[docs] @classmethod def _convert(cls, obj, from_user=None, **kwargs): """Converts an :class:`Object` to this protocol's data format. To be implemented by subclasses. Implementations should generally call :meth:`Protocol.translate_ids` (as their own class) before converting to their format. Args: obj (models.Object): from_user (models.User): user (actor) this activity/object is from kwargs: protocol-specific Returns: converted object in the protocol's native format, often a dict. May return the ``{}`` empty dict if the object can't be converted. """ raise NotImplementedError()
[docs] @classmethod def set_username(to_cls, user, username): """Sets a custom username for a user's bridged account in this protocol. Args: user (models.User) username (str) Raises: ValueError: if the username is invalid RuntimeError: if the username could not be set """ raise NotImplementedError()
[docs] @classmethod def migrate_out(cls, user, to_user_id): """Migrates a bridged account out to be a native account. Args: user (models.User) to_user_id (str) Raises: ValueError: eg if this protocol doesn't own ``to_user_id``, or if ``user`` is on this protocol or not bridged to this protocol """ raise NotImplementedError()
[docs] @classmethod def check_can_migrate_out(cls, user, to_user_id): """Raises an exception if a user can't yet migrate to a native account. For example, if ``to_user_id`` isn't on this protocol, or if ``user`` is on this protocol, or isn't bridged to this protocol. If the user is ready to migrate, returns ``None``. Subclasses may override this to add more criteria, but they should call this implementation first. Args: user (models.User) to_user_id (str) Raises: ValueError: if ``user`` isn't ready to migrate to this protocol yet """ def _error(msg): logger.warning(msg) raise ValueError(msg) if cls.owns_id(to_user_id) is False: _error(f"{to_user_id} doesn't look like an {cls.LABEL} id") elif isinstance(user, cls): _error(f"{user.handle_or_id()} is on {cls.PHRASE}") elif not user.is_enabled(cls): _error(f"{user.handle_or_id()} isn't currently bridged to {cls.PHRASE}")
[docs] @classmethod def migrate_in(cls, user, from_user_id, **kwargs): """Migrates a native account in to be a bridged account. The protocol independent parts are done here; protocol-specific parts are done in :meth:`_migrate_in`, which this wraps. Reloads the user's profile before calling :meth:`_migrate_in`. Args: user (models.User): native user on another protocol to attach the newly imported bridged account to from_user_id (str) kwargs: additional protocol-specific parameters Raises: ValueError: eg if this protocol doesn't own ``from_user_id``, or if ``user`` is on this protocol or already bridged to this protocol """ def _error(msg): logger.warning(msg) raise ValueError(msg) logger.info(f"Migrating in {from_user_id} for {user.key.id()}") # check req'ts if cls.owns_id(from_user_id) is False: _error(f"{from_user_id} doesn't look like an {cls.LABEL} id") elif isinstance(user, cls): _error(f"{user.handle_or_id()} is on {cls.PHRASE}") elif cls.HAS_COPIES and cls.LABEL in user.enabled_protocols: _error(f"{user.handle_or_id()} is already bridged to {cls.PHRASE}") # reload profile try: user.reload_profile() except (RequestException, HTTPException) as e: _, msg = util.interpret_http_exception(e) # migrate! cls._migrate_in(user, from_user_id, **kwargs) user.add('enabled_protocols', cls.LABEL) user.put() # attach profile object if user.obj: if cls.HAS_COPIES: profile_id = ids.profile_id(id=from_user_id, proto=cls) user.obj.remove_copies_on(cls) user.obj.add('copies', Target(uri=profile_id, protocol=cls.LABEL)) user.obj.put() common.create_task(queue='receive', obj_id=user.obj_key.id(), authed_as=user.key.id())
@classmethod def _migrate_in(cls, user, from_user_id, **kwargs): """Protocol-specific parts of migrating in external account. Called by :meth:`migrate_in`, which does most of the work, including calling :meth:`reload_profile` before this. Args: user (models.User): native user on another protocol to attach the newly imported account to. Unused. from_user_id (str): DID of the account to be migrated in kwargs: protocol dependent """ raise NotImplementedError()
[docs] @classmethod def target_for(cls, obj, shared=False): """Returns an :class:`Object`'s delivery target (endpoint). To be implemented by subclasses. Examples: * If obj has ``source_protocol`` ``web``, returns its URL, as a webmention target. * If obj is an ``activitypub`` actor, returns its inbox. * If obj is an ``activitypub`` object, returns it's author's or actor's inbox. Args: obj (models.Object): shared (bool): optional. If True, returns a common/shared endpoint, eg ActivityPub's ``sharedInbox``, that can be reused for multiple recipients for efficiency Returns: str: target endpoint, or None if not available. """ raise NotImplementedError()
[docs] @classmethod def is_blocklisted(cls, url, allow_internal=False): """Returns True if we block the given URL and shouldn't deliver to it. Default implementation here, subclasses may override. Args: url (str): allow_internal (bool): whether to return False for internal domains like ``fed.brid.gy``, ``bsky.brid.gy``, etc """ blocklist = DOMAIN_BLOCKLIST if not DEBUG: blocklist += tuple(util.RESERVED_TLDS | util.LOCAL_TLDS) if not allow_internal: blocklist += DOMAINS return util.domain_or_parent_in(url, blocklist)
[docs] @classmethod def translate_ids(to_cls, obj): """Translates all ids in an AS1 object to a specific protocol. Infers source protocol for each id value separately. For example, if ``proto`` is :class:`ActivityPub`, the ATProto URI ``at://did:plc:abc/coll/123`` will be converted to ``https://bsky.brid.gy/ap/at://did:plc:abc/coll/123``. Wraps these AS1 fields: * ``id`` * ``actor`` * ``author`` * ``bcc`` * ``bto`` * ``cc`` * ``featured[].items``, ``featured[].orderedItems`` * ``object`` * ``object.actor`` * ``object.author`` * ``object.id`` * ``object.inReplyTo`` * ``object.object`` * ``attachments[].id`` * ``tags[objectType=mention].url`` * ``to`` This is the inverse of :meth:`models.Object.resolve_ids`. Much of the same logic is duplicated there! TODO: unify with :meth:`Object.resolve_ids`, :meth:`models.Object.normalize_ids`. Args: to_proto (Protocol subclass) obj (dict): AS1 object or activity (not :class:`models.Object`!) Returns: dict: translated AS1 version of ``obj`` """ from ui import UIProtocol assert to_cls != Protocol if not obj: return obj outer_obj = to_cls.translate_mention_handles(copy.deepcopy(obj)) inner_objs = outer_obj['object'] = as1.get_objects(outer_obj) def translate(elem, field, fn, uri=False): owner_id = as1.get_owner(elem) owner_proto = Protocol.for_id(owner_id) elem[field] = as1.get_objects(elem, field) for obj in elem[field]: if id := obj.get('id'): if field in ('to', 'cc', 'bcc', 'bto') and as1.is_audience(id): continue from_cls = Protocol.for_id(id) if field == 'id' and from_cls == UIProtocol and owner_proto: logger.info(f'owner of {id} {owner_id} is {owner_proto.LABEL}, translating id from that protocol') from_cls = owner_proto # TODO: what if from_cls is None? relax translate_object_id, # make it a noop if we don't know enough about from/to? if from_cls and from_cls != to_cls: obj['id'] = fn(id=id, from_=from_cls, to=to_cls) if uri: obj['id'] = to_cls(id=obj['id']).id_uri() if obj['id'] else id elem[field] = [o['id'] if o.keys() == {'id'} else o for o in elem[field]] if len(elem[field]) == 1 and field not in ('items', 'orderedItems'): elem[field] = elem[field][0] type = as1.object_type(outer_obj) translate(outer_obj, 'id', ids.translate_user_id if type in as1.ACTOR_TYPES else ids.translate_object_id) for o in inner_objs: is_actor = (as1.object_type(o) in as1.ACTOR_TYPES or as1.get_owner(outer_obj) == o.get('id') or type in ('follow', 'stop-following', 'block')) translate(o, 'id', (ids.translate_user_id if is_actor else ids.translate_object_id)) # TODO: need to handle both user and object ids here # https://github.com/snarfed/bridgy-fed/issues/2281 obj_is_actor = o.get('verb') in as1.VERBS_WITH_ACTOR_OBJECT translate(o, 'object', (ids.translate_user_id if obj_is_actor else ids.translate_object_id)) for o in [outer_obj] + inner_objs: translate(o, 'inReplyTo', ids.translate_object_id) for field in 'actor', 'author', 'to', 'cc', 'bto', 'bcc': translate(o, field, ids.translate_user_id) for tag in as1.get_objects(o, 'tags'): if tag.get('objectType') == 'mention': translate(tag, 'url', ids.translate_user_id, uri=True) for att in as1.get_objects(o, 'attachments'): translate(att, 'id', ids.translate_object_id) url = att.get('url') if url and not att.get('id'): if from_cls := Protocol.for_id(url): att['id'] = ids.translate_object_id(from_=from_cls, to=to_cls, id=url) if feat := as1.get_object(o, 'featured'): translate(feat, 'orderedItems', ids.translate_object_id) translate(feat, 'items', ids.translate_object_id) outer_obj = util.trim_nulls(outer_obj) if objs := util.get_list(outer_obj ,'object'): outer_obj['object'] = [o['id'] if o.keys() == {'id'} else o for o in objs] if len(outer_obj['object']) == 1: outer_obj['object'] = outer_obj['object'][0] return outer_obj
[docs] @classmethod def translate_mention_handles(cls, obj): """Translates @-mentions in ``obj.content`` to this protocol's handles. Specifically, for each ``mention`` tag in the object's tags that has ``startIndex`` and ``length``, replaces it in ``obj.content`` with that user's translated handle in this protocol and updates the tag's location. Called by :meth:`Protocol.translate_ids`. If ``obj.content`` is HTML, does nothing. Args: obj (dict): AS1 object Returns: dict: modified AS1 object """ if not obj: return None obj = copy.deepcopy(obj) obj['object'] = [cls.translate_mention_handles(o) for o in as1.get_objects(obj)] if len(obj['object']) == 1: obj['object'] = obj['object'][0] content = obj.get('content') tags = obj.get('tags') if (not content or not tags or obj.get('content_is_html') or bool(BeautifulSoup(content, 'html.parser').find()) or HTML_ENTITY_RE.search(content)): return util.trim_nulls(obj) indexed = [tag for tag in tags if tag.get('startIndex') and tag.get('length')] offset = 0 last_orig_end = 0 for tag in sorted(indexed, key=lambda t: t['startIndex']): orig_start = tag['startIndex'] if orig_start < last_orig_end: logger.warning(f'tags overlap! removing indices from {tag.get("url")}') del tag['startIndex'] del tag['length'] continue orig_end = orig_start + tag['length'] last_orig_end = orig_end tag['startIndex'] += offset if tag.get('objectType') == 'mention' and (id := tag['url']): if proto := Protocol.for_id(id): id = ids.normalize_user_id(id=id, proto=proto) if key := get_original_user_key(id): user = key.get() else: user = proto.get_or_create(id, allow_opt_out=True) if user: start = tag['startIndex'] end = start + tag['length'] if handle := user.handle_as(cls): content = content[:start] + handle + content[end:] offset += len(handle) - tag['length'] tag.update({ 'displayName': handle, 'length': len(handle), }) obj['tags'] = tags as2.set_content(obj, content) # sets content *and* contentMap; obj is still AS1 here return util.trim_nulls(obj)
[docs] @classmethod def receive(from_cls, obj, authed_as=None, internal=False, received_at=None): """Handles an incoming activity. If ``obj``'s key is unset, ``obj.as1``'s id field is used. If both are unset, returns HTTP 299. Args: obj (models.Object) authed_as (str): authenticated actor id who sent this activity internal (bool): whether to allow activity ids on internal domains, from opted out/blocked users, etc. received_at (datetime): when we first saw (received) this activity. Right now only used for monitoring. Returns: (str, int) tuple: (response body, HTTP status code) Flask response Raises: werkzeug.HTTPException: if the request is invalid """ # check some invariants assert from_cls != Protocol assert isinstance(obj, Object), obj if not obj.as1: error('No object data provided') orig_obj = obj id = None if obj.key and obj.key.id(): id = obj.key.id() if not id: id = obj.as1.get('id') obj.key = ndb.Key(Object, id) if not id: error('No id provided') elif from_cls.owns_id(id) is False: error(f'Protocol {from_cls.LABEL} does not own id {id}') elif from_cls.is_blocklisted(id, allow_internal=internal): error(f'{id} is blocklisted') # does this protocol support this activity/object type? from_cls.check_supported(obj, 'receive') # lease this object, atomically memcache_key = activity_id_memcache_key(id) leased = memcache.memcache.add( memcache_key, 'leased', noreply=False, expire=int(MEMCACHE_LEASE_EXPIRATION.total_seconds())) # short circuit if we've already seen this activity id if ('force' not in request.values and (not leased or (obj.new is False and obj.changed is False))): error(f'Already seen', status=204) pruned = {k: v for k, v in obj.as1.items() if k not in ('contentMap', 'replies', 'signature')} delay = '' retry = request.headers.get('X-AppEngine-TaskRetryCount') if (received_at and retry in (None, '0') and obj.type not in ('delete', 'undo')): # we delay deletes/undos delay_s = int((util.now().replace(tzinfo=None) - received_at.replace(tzinfo=None) ).total_seconds()) delay = f'({delay_s} s behind)' logger.info(f'Receiving {from_cls.LABEL} {obj.type} {id} {delay} AS1: {json_dumps(pruned, indent=2)}') # check authorization # https://www.w3.org/wiki/ActivityPub/Primer/Authentication_Authorization actor = as1.get_owner(obj.as1) if not actor: error('Activity missing actor or author') if not (from_user_cls := obj.owner_protocol()): error(f"couldn't determine owner protocol for {obj.key.id()} source_protocol {obj.source_protocol}", status=204) elif from_user_cls.owns_id(actor) is False: error(f"{from_user_cls.LABEL} doesn't own actor {actor}, this is probably a bridged activity. Skipping.", status=204) assert authed_as assert isinstance(authed_as, str) authed_as = ids.normalize_user_id(id=authed_as, proto=from_user_cls) actor = ids.normalize_user_id(id=actor, proto=from_user_cls) if actor != authed_as and not internal: report_error("Auth: receive: authed_as doesn't match owner", user=f'{id} authed_as {authed_as} owner {actor}') error(f"actor {actor} isn't authed user {authed_as}") # update copy ids to originals obj.normalize_ids() obj.resolve_ids() if (obj.type == 'follow' and Protocol.for_bridgy_subdomain(as1.get_object(obj.as1).get('id'))): # follows of bot user; refresh user profile first logger.info(f'Follow of bot user, reloading {actor}') from_user = from_user_cls.get_or_create(id=actor, allow_opt_out=True) from_user.reload_profile() else: # load actor user from_user = from_user_cls.get_or_create(id=actor, allow_opt_out=True) if not internal and (not from_user or from_user.manual_opt_out): error(f"Couldn't load actor {actor}", status=204) # apply protocol-specific filters if 'force' not in request.values: for filter in from_cls.RECEIVE_FILTERS: if filter(obj, from_user): error(f'Activity {id} blocked by filter {filter.__name__}') # check if this is a profile object coming in via a user with use_instead # set. if so, override the object's id to be the final user id (from_user's), # after following use_instead. if obj.type in as1.ACTOR_TYPES and from_user.key.id() != actor: as1_id = obj.as1.get('id') if ids.normalize_user_id(id=as1_id, proto=from_user) == actor: logger.info(f'Overriding AS1 object id {as1_id} with Object id {from_user.profile_id()}') obj.our_as1 = {**obj.as1, 'id': from_user.profile_id()} # if this is an object, ie not an activity, wrap it in a create or update obj = from_cls.handle_bare_object(obj, authed_as=authed_as, from_user=from_user) obj.add('users', from_user.key) inner_obj_as1 = as1.get_object(obj.as1) inner_obj_id = inner_obj_as1.get('id') if obj.type in as1.CRUD_VERBS | as1.VERBS_WITH_OBJECT: if not inner_obj_id: error(f'{obj.type} object has no id!') # check age. we support backdated posts, but if they're over 2w old, we # don't deliver them if obj.type == 'post': if published := inner_obj_as1.get('published'): try: published_dt = util.parse_iso8601(published) if not published_dt.tzinfo: published_dt = published_dt.replace(tzinfo=timezone.utc) age = util.now() - published_dt if (age > CREATE_MAX_AGE and 'force' not in request.values and not util.domain_or_parent_in( from_user.key.id(), CREATE_MAX_AGE_EXEMPT_DOMAINS)): error(f'Ignoring, too old, {age} is over {CREATE_MAX_AGE}', status=204) except ValueError: # from parse_iso8601 logger.debug(f"Couldn't parse published {published}") # write Object to datastore if obj.type in STORE_AS1_TYPES: obj.put() # store inner object # TODO: unify with big obj.type conditional below. would have to merge # this with the DM handling block lower down. crud_obj = None if obj.type in ('post', 'update') and inner_obj_as1.keys() > set(['id']): # normalize_ids may have converted the inner object id to a user id # (eg Web profile URL to domain), so normalize back to the profile # object id to find the right existing Object in the datastore crud_obj_id = (ids.normalize_object_id(id=inner_obj_id, proto=from_cls) or inner_obj_id) crud_obj = Object.get_or_create(crud_obj_id, our_as1=inner_obj_as1, source_protocol=obj.source_protocol, authed_as=actor, users=[from_user.key], deleted=False) actor = as1.get_object(obj.as1, 'actor') actor_id = actor.get('id') # handle activity! if obj.type == 'stop-following': # TODO: unify with handle_follow? # TODO: handle multiple followees if not actor_id or not inner_obj_id: error(f'stop-following requires actor id and object id. Got: {actor_id} {inner_obj_id} {obj.as1}') # deactivate Follower from_ = from_user_cls.key_for(actor_id) if not (to_cls := Protocol.for_id(inner_obj_id)): error(f"Can't determine protocol for {inner_obj_id} , giving up") to = to_cls.key_for(inner_obj_id) follower = Follower.query(Follower.to == to, Follower.from_ == from_, Follower.status == 'active').get() if follower: follow_id = obj.as1.get('followId') if (follow_id and follower.follow and follower.follow.id() != follow_id): logger.info(f"Ignoring stop-following: its follow id {follow_id} doesn't match current {follower.follow.id()}") return 'OK', 204 logger.info(f'Marking {follower} inactive') follower.status = 'inactive' follower.put() else: logger.warning(f'No Follower found for {from_} => {to}') # fall through to deliver to followee # TODO: do we convert stop-following to webmention 410 of original # follow? # fall through to deliver to followers elif obj.type in ('delete', 'undo'): delete_obj_id = (from_user.profile_id() if inner_obj_id == from_user.key.id() else inner_obj_id) delete_obj = Object.get_by_id(delete_obj_id, authed_as=authed_as) if not delete_obj: logger.info(f"Ignoring, we don't have {delete_obj_id} stored") return 'OK', 204 # TODO: just delete altogether! logger.info(f'Marking Object {delete_obj_id} deleted') delete_obj.deleted = True delete_obj.put() # if this is an actor, handle deleting it later so that # in case it's from_user, user.enabled_protocols is still populated # # fall through to deliver to followers and delete copy if necessary. # should happen via protocol-specific copy target and send of # delete activity. # https://github.com/snarfed/bridgy-fed/issues/63 elif obj.type == 'block': if proto := Protocol.for_bridgy_subdomain(inner_obj_id): # blocking protocol bot user disables that protocol from_user.delete(proto) from_user.disable_protocol(proto) return 'OK', 200 elif obj.type == 'move': from_cls.handle_move(obj, from_user=from_user) # fall through to deliver the Move activity to remaining followers elif obj.type == 'post': # handle DMs to bot users if as1.is_dm(obj.as1): return dms.receive(from_user=from_user, obj=obj) # fetch actor if necessary is_user = from_user.is_profile(orig_obj) if (actor and actor.keys() == set(['id']) and not is_user and obj.type not in ('delete', 'undo')): logger.debug('Fetching actor so we have name, profile photo, etc') actor_obj = from_user_cls.load( ids.profile_id(id=actor['id'], proto=from_cls), raise_=False) if actor_obj and actor_obj.as1: obj.our_as1 = { **obj.as1, 'actor': { **actor_obj.as1, # override profile id with actor id # https://github.com/snarfed/bridgy-fed/issues/1720 'id': actor['id'], } } # fetch object if necessary if (obj.type in ('post', 'update', 'share') and inner_obj_as1.keys() == set(['id']) and from_cls.owns_id(inner_obj_id) is not False): logger.debug('Fetching inner object') inner_obj = from_cls.load(inner_obj_id, raise_=False, remote=(obj.type in ('post', 'update'))) if obj.type in ('post', 'update'): crud_obj = inner_obj if inner_obj and inner_obj.as1: obj.our_as1 = { **obj.as1, 'object': { **inner_obj_as1, **inner_obj.as1, } } elif obj.type in ('post', 'update'): error(f"Need object {inner_obj_id} but couldn't fetch, giving up") if obj.type == 'follow': if proto := Protocol.for_bridgy_subdomain(inner_obj_id): # follow of one of our protocol bot users; enable that protocol. # fall through so that we send an accept. try: from_user.enable_protocol(proto) except ErrorButDoNotRetryTask: from web import Web bot = Web.get_by_id(proto.bot_user_id()) from_cls.respond_to_follow('reject', follower=from_user, followee=bot, follow=obj) raise proto.bot_maybe_follow_back(from_user) from_cls.handle_follow(obj, from_user=from_user) return 'OK', 202 from_cls.handle_follow(obj, from_user=from_user) # on update of the user's own actor/profile, set user.obj and store user back # to datastore so that we recalculate computed properties like status etc if is_user: if obj.type == 'update' and crud_obj: logger.info(f"update of the user's profile, re-storing user with obj_key {crud_obj.key.id()}") from_user.obj = crud_obj from_user.put() # deliver to targets resp = from_cls.deliver(obj, from_user=from_user, crud_obj=crud_obj) # on user deleting themselves, deactivate their followers/followings. # https://github.com/snarfed/bridgy-fed/issues/1304 # # do this *after* delivering because delivery finds targets based on # stored Followers if is_user and obj.type == 'delete': for proto in from_user.enabled_protocols: from_user.disable_protocol(PROTOCOLS[proto]) logger.info(f'Deactivating Followers from or to {from_user.key.id()}') followers = Follower.query( OR(Follower.to == from_user.key, Follower.from_ == from_user.key) ).fetch() for f in followers: f.status = 'inactive' ndb.put_multi(followers) memcache.memcache.set(memcache_key, 'done', expire=7 * 24 * 60 * 60) # 1w return resp
[docs] @classmethod def handle_follow(from_cls, obj, from_user): """Handles an incoming follow activity. Sends an ``Accept`` back, but doesn't send the ``Follow`` itself. That happens in :meth:`deliver`. Args: obj (models.Object): follow activity """ logger.debug('Got follow. storing Follow(s), sending accept(s)') from_id = from_user.key.id() # Prepare followee (to) users' data to_as1s = as1.get_objects(obj.as1) if not to_as1s: error(f'Follow activity requires object(s). Got: {obj.as1}') # Store Followers for to_as1 in to_as1s: to_id = to_as1.get('id') if not to_id: error(f'Follow activity requires object(s). Got: {obj.as1}') logger.info(f'Follow {from_id} => {to_id}') to_cls = Protocol.for_id(to_id) if not to_cls: error(f"Couldn't determine protocol for {to_id}") elif from_cls == to_cls: logger.info(f'Skipping same-protocol Follower {from_id} => {to_id}') continue to_key = to_cls.key_for(to_id) if not to_key: logger.info(f'Skipping invalid {to_cls.LABEL} user key: {to_id}') continue to_user = to_cls.get_or_create(id=to_key.id()) if not to_user or not to_user.is_enabled(from_cls): error(f'{to_id} not found') follower_obj = Follower.get_or_create(to=to_user, from_=from_user, follow=obj.key, status='active') if (from_cls.USES_OBJECT_FEED and from_cls.LABEL not in to_user.has_object_feed_followers_on): to_user.has_object_feed_followers_on.append(from_cls.LABEL) to_user.put() obj.add('notify', to_key) from_cls.respond_to_follow('accept', follower=from_user, followee=to_user, follow=obj)
[docs] @classmethod def respond_to_follow(_, verb, follower, followee, follow): """Sends an accept or reject activity for a follow. ...if the follower's protocol supports accepts/rejects. Otherwise, does nothing. Args: verb (str): ``accept`` or ``reject`` follower (models.User) followee (models.User) follow (models.Object) """ assert verb in ('accept', 'reject') if verb not in follower.SUPPORTED_AS1_TYPES: return if not follower.obj or not (target := follower.target_for(follower.obj)): error(f"Couldn't find delivery target for follower {follower.key.id()}") # send. note that this is one response for the whole follow, even if it # has multiple followees! id = f'{followee.key.id()}/followers#{verb}-{follow.key.id()}' accept = { 'id': id, 'objectType': 'activity', 'verb': verb, 'actor': followee.key.id(), 'object': follow.as1, } common.create_task(queue='send', id=id, our_as1=accept, url=target, protocol=follower.LABEL, user=followee.key.urlsafe())
[docs] @classmethod def bot_maybe_follow_back(bot_cls, user): """Follow a user from a protocol bot user, if their protocol needs that. ...so that the protocol starts sending us their activities, if it needs a follow for that (eg ActivityPub). Args: user (User) """ if not user.BOTS_FOLLOW_BACK: return from web import Web bot = Web.get_by_id(bot_cls.bot_user_id()) now = util.now().isoformat() logger.info(f'Following {user.key.id()} back from bot user {bot.key.id()}') if not user.obj: logger.info(" can't follow, user has no profile obj") return target = user.target_for(user.obj) follow_back_id = f'https://{bot.key.id()}/#follow-back-{user.key.id()}-{now}' follow_back_as1 = { 'objectType': 'activity', 'verb': 'follow', 'id': follow_back_id, 'actor': bot.key.id(), 'object': user.key.id(), } common.create_task(queue='send', id=follow_back_id, our_as1=follow_back_as1, url=target, source_protocol='web', protocol=user.LABEL, user=bot.key.urlsafe())
[docs] @classmethod def handle_move(from_cls, obj, from_user): """Handles an incoming move (account migration) activity. Updates all of the account's :class:`Follower`s to point to the new id. Args: obj (models.Object): follow activity from_user (models.User): user (actor) this activity/object is from """ if not (target_id := as1.get_id(obj.as1, 'target')): error(f'Move activity requires target. Got: {obj.as1}') logger.info(f'Got move activity from {from_user.key.id()} to {target_id}') # check that object is the actor (the account being moved) actor_id = as1.get_id(obj.as1, 'actor') object_id = as1.get_id(obj.as1, 'object') if actor_id != object_id: error(f"Move activity object {object_id} isn't actor {actor_id}") # get the target protocol and key to_cls = Protocol.for_id(target_id) if not to_cls: error(f"Couldn't determine protocol for target {target_id}") to_user = to_cls.get_or_create(target_id, manual_opt_out=False, enabled_protocols=from_user.enabled_protocols) if not to_user: error(f"Couldn't create {to_cls.LABEL} user {target_id}", status=299) if from_user.enabled_protocols: # from user has bridged copy accounts; transfer them to the new user for label in from_user.enabled_protocols: proto = PROTOCOLS[label] if copy_id := from_user.get_copy(proto): from_user.remove_copies_on(proto) to_user.add('copies', Target(uri=copy_id, protocol=label)) from_user.put() to_user.put() # query for all active followers of the source account followers = Follower.query( Follower.to == from_user.key, Follower.status == 'active' ).fetch() # update each follower to point to the new account # but skip if it would create a same-protocol follower logger.info(f'Updating {len(followers)} followers from {actor_id} to {target_id}') updated_followers = [] for follower in followers: # check if this would create a same-protocol follower if follower.from_.kind() != to_user.key.kind(): follower.to = to_user.key updated_followers.append(follower) else: logger.info(f'Skipping same-protocol follower {follower.from_.id()} => {to_user.key.id()}') if updated_followers: ndb.put_multi(updated_followers)
[docs] @classmethod def handle_bare_object(cls, obj, *, authed_as, from_user): """If obj is a bare object, wraps it in a create or update activity. Checks if we've seen it before. Args: obj (models.Object) authed_as (str): authenticated actor id who sent this activity from_user (models.User): user (actor) this activity/object is from Returns: models.Object: ``obj`` if it's an activity, otherwise a new object """ is_actor = obj.type in as1.ACTOR_TYPES if not is_actor and obj.type not in ('note', 'article', 'comment'): return obj obj_actor = ids.normalize_user_id(id=as1.get_owner(obj.as1), proto=cls) now = util.now().isoformat() # this is a raw post; wrap it in a create or update activity if obj.changed or is_actor: if obj.changed: logger.info(f'Content has changed from last time at {obj.updated}! Redelivering to all inboxes') else: logger.info(f'Got actor profile object, wrapping in update') id = f'{obj.key.id()}#bridgy-fed-update-{now}' update_as1 = { 'objectType': 'activity', 'verb': 'update', 'id': id, 'actor': obj_actor, 'object': { # Mastodon requires the updated field for Updates, so # add a default value. # https://docs.joinmastodon.org/spec/activitypub/#supported-activities-for-statuses # https://socialhub.activitypub.rocks/t/what-could-be-the-reason-that-my-update-activity-does-not-work/2893/4 # https://github.com/mastodon/documentation/pull/1150 'updated': now, **obj.as1, }, } logger.debug(f' AS1: {json_dumps(update_as1, indent=2)}') return Object(id=id, our_as1=update_as1, source_protocol=obj.source_protocol) if obj.new or 'force' in request.values: create_id = f'{obj.key.id()}#bridgy-fed-create-{now}' create_as1 = { 'objectType': 'activity', 'verb': 'post', 'id': create_id, 'actor': obj_actor, 'object': obj.as1, 'published': now, } logger.info(f'Wrapping in post') logger.debug(f' AS1: {json_dumps(create_as1, indent=2)}') return Object(id=create_id, our_as1=create_as1, source_protocol=obj.source_protocol) error(f'{obj.key.id()} is unchanged, nothing to do', status=204)
[docs] @classmethod def deliver(from_cls, obj, from_user, crud_obj=None, to_proto=None): """Delivers an activity to its external recipients. Args: obj (models.Object): activity to deliver from_user (models.User): user (actor) this activity is from crud_obj (models.Object): if this is a create, update, or delete/undo activity, the inner object that's being written, otherwise None. (This object's ``notify`` and ``feed`` properties may be updated.) to_proto (protocol.Protocol): optional; if provided, only deliver to targets on this protocol Returns: (str, int) tuple: Flask response """ if to_proto: logger.info(f'Only delivering to {to_proto.LABEL}') # find delivery targets. maps Target to Object or None # # ...then write the relevant object, since targets() has a side effect of # setting the notify and feed properties (and dirty attribute) targets = from_cls.targets(obj, from_user=from_user, crud_obj=crud_obj) if to_proto: targets = {t: obj for t, obj in targets.items() if t.protocol == to_proto.LABEL} if not targets: # don't raise via error() because we call deliver in code paths where # we want to continue after msg = r'No targets, nothing to do ¯\_(ツ)_/¯' logger.info(msg) return msg, 204 # store object that targets() updated if crud_obj and crud_obj.dirty: crud_obj.put() elif obj.type in STORE_AS1_TYPES and obj.dirty: obj.put() obj_params = ({'obj_id': obj.key.id()} if obj.type in STORE_AS1_TYPES else obj.to_request()) # sort targets so order is deterministic for tests, debugging, etc sorted_targets = sorted(targets.items(), key=lambda t: t[0].uri) # enqueue send task for each targets logger.info(f'Delivering to {" ".join(t.uri for t, _ in sorted_targets)}') user = from_user.key.urlsafe() for i, (target, orig_obj) in enumerate(sorted_targets): orig_obj_id = orig_obj.key.id() if orig_obj else None common.create_task(queue='send', url=target.uri, protocol=target.protocol, orig_obj_id=orig_obj_id, user=user, **obj_params) return 'OK', 202
[docs] @classmethod def targets(from_cls, obj, from_user, crud_obj=None, internal=False): """Collects the targets to send a :class:`models.Object` to. Targets are both objects - original posts, events, etc - and actors. Args: obj (models.Object) from_user (User) crud_obj (models.Object): if this is a create, update, or delete/undo activity, the inner object that's being written, otherwise None. (This object's ``notify`` and ``feed`` properties may be updated.) internal (bool): whether this is a recursive internal call Returns: dict: maps :class:`models.Target` to original (in response to) :class:`models.Object` """ logger.debug('Finding recipients and their targets') # we should only have crud_obj iff this is a create or update assert (crud_obj is not None) == (obj.type in ('post', 'update')), obj.type write_obj = crud_obj or obj write_obj.dirty = False target_uris = as1.targets(obj.as1) orig_obj = None targets = {} # maps Target (with *normalized* uri) to Object or None owner = as1.get_owner(obj.as1) allow_opt_out = (obj.type == 'delete') inner_obj_as1 = as1.get_object(obj.as1) inner_obj_id = inner_obj_as1.get('id') in_reply_tos = as1.get_ids(inner_obj_as1, 'inReplyTo') quoted_posts = as1.quoted_posts(inner_obj_as1) mentioned_urls = as1.mentions(inner_obj_as1) is_reply = obj.type == 'comment' or in_reply_tos is_self_reply = False original_ids = [] if is_reply: original_ids = in_reply_tos elif inner_obj_id: if inner_obj_id == from_user.key.id(): inner_obj_id = from_user.profile_id() original_ids = [inner_obj_id] # maps id to Object original_objs = {} for id in original_ids: if proto := Protocol.for_id(id): original_objs[id] = proto.load(id, raise_=False) # for AP, add in-reply-tos' mentions # https://github.com/snarfed/bridgy-fed/issues/1608 # https://github.com/snarfed/bridgy-fed/issues/1218 orig_post_mentions = {} # maps mentioned id to original post Object for id in in_reply_tos: if ((in_reply_to_obj := original_objs.get(id)) and (proto := PROTOCOLS.get(in_reply_to_obj.source_protocol)) and proto.SEND_REPLIES_TO_ORIG_POSTS_MENTIONS and (mentions := as1.mentions(in_reply_to_obj.as1))): logger.info(f"Adding in-reply-to {id} 's mentions to targets: {mentions}") target_uris.extend(mentions) for mention in mentions: orig_post_mentions[mention] = in_reply_to_obj target_uris = sorted(set(target_uris)) logger.info(f'Raw targets: {target_uris}') # which protocols should we allow delivering to? to_protocols = [] # elements are Protocol subclasses for label in (list(from_user.DEFAULT_ENABLED_PROTOCOLS) + from_user.enabled_protocols): if not (proto := PROTOCOLS.get(label)): report_error(f'unknown enabled protocol {label} for {from_user.key.id()}') continue if (obj.type == 'post' and (orig := original_objs.get(inner_obj_id)) and orig.get_copy(proto)): logger.info(f'Already created {id} on {label}, cowardly refusing to create there again') continue if proto.HAS_COPIES and (obj.type in ('update', 'delete', 'share', 'undo') or is_reply): origs_could_bridge = None for id in original_ids: if not (orig := original_objs.get(id)): continue elif orig.get_copy(proto): logger.info(f'Allowing {label}, original {id} was bridged there') break elif from_user.is_profile(orig): logger.info(f"Allowing {label}, this is the user's profile") break if (origs_could_bridge is not False and (orig_author_id := as1.get_owner(orig.as1)) and (orig_proto := orig.owner_protocol()) and (orig_author := orig_proto.get_by_id(orig_author_id))): origs_could_bridge = orig_author.is_enabled(proto) else: msg = f"original object(s) {original_ids} weren't bridged to {label}" last_retry = False if retries := request.headers.get(TASK_RETRIES_HEADER): if (last_retry := int(retries) >= TASK_RETRIES_RECEIVE): logger.info(f'last retry! skipping {proto.LABEL} and continuing') if (proto.LABEL not in from_user.DEFAULT_ENABLED_PROTOCOLS and origs_could_bridge and not last_retry): # retry later; original obj may still be bridging # TODO: limit to brief window, eg no older than 2h? 1d? error(msg, status=304) logger.info(msg) continue util.add(to_protocols, proto) logger.info(f'allowed protocols {[p.LABEL for p in to_protocols]}') # process direct targets for target_id in target_uris: target_proto = Protocol.for_id(target_id) if not target_proto: logger.info(f"Can't determine protocol for {target_id}") continue elif target_proto.is_blocklisted(target_id): logger.debug(f'{target_id} is blocklisted') continue target_is_actor = (target_id in mentioned_urls or obj.type in as1.VERBS_WITH_ACTOR_OBJECT) target_obj_id = (ids.profile_id(id=target_id, proto=target_proto) if target_is_actor # not ideal. this can sometimes be a non-user, eg # blocking a blocklist. ok right now since profile_id() # returns its input id unchanged if it doesn't look like # a user id, but that's brittle. else target_id) orig_obj = target_proto.load(target_obj_id, raise_=False) if not orig_obj or not orig_obj.as1: logger.info(f"Couldn't load {target_obj_id}") continue target_author_key = (target_proto(id=target_id).key if target_is_actor else target_proto.actor_key(orig_obj)) if not from_user.is_enabled(target_proto): # if author isn't bridged and target user is, DM a prompt and # add a notif for the target user if (target_id in (in_reply_tos + quoted_posts + mentioned_urls) and target_author_key): if target_author := target_author_key.get(): if target_author.is_enabled(from_cls): notifications.add_notification(target_author, write_obj) verb, noun = ( ('replied to', 'replies') if target_id in in_reply_tos else ('quoted', 'quotes') if target_id in quoted_posts else ('mentioned', 'mentions')) dms.maybe_send(from_=target_proto, to_user=from_user, type='replied_to_bridged_user', text=f"""\ Hi! You <a href="{inner_obj_as1.get('url') or inner_obj_id}">recently {verb}</a> {target_author.html_link()}, who's bridged here from {target_proto.PHRASE}. If you want them to see your {noun}, you can bridge your account into {target_proto.PHRASE} by following this account. <a href="https://fed.brid.gy/docs">See the docs</a> for more information.""") continue # deliver self-replies to followers # https://github.com/snarfed/bridgy-fed/issues/639 if target_id in in_reply_tos and owner == as1.get_owner(orig_obj.as1): is_self_reply = True logger.info(f'self reply!') # also add copies' targets for copy in orig_obj.copies: proto = PROTOCOLS[copy.protocol] if proto in to_protocols: # copies generally won't have their own Objects if target := proto.target_for(Object(id=copy.uri)): target = util.normalize_url(target, trailing_slash=False) logger.debug(f'Adding target {target} for copy {copy.uri} of original {target_id}') targets[Target(protocol=copy.protocol, uri=target)] = orig_obj if target_proto == from_cls: logger.debug(f'Skipping same-protocol target {target_id}') continue target = target_proto.target_for(orig_obj) if not target: # TODO: surface errors like this somehow? logger.error(f"Can't find delivery target for {target_id}") continue target = util.normalize_url(target, trailing_slash=False) logger.debug(f'Target for {target_id} is {target} {target_author_key}') # only use orig_obj for inReplyTos, like/repost objects, reply's original # post's mentions, etc # https://github.com/snarfed/bridgy-fed/issues/1237 target_obj = None if target_id in in_reply_tos + as1.get_ids(obj.as1, 'object'): target_obj = orig_obj elif target_id in orig_post_mentions: target_obj = orig_post_mentions[target_id] targets[Target(protocol=target_proto.LABEL, uri=target)] = target_obj if target_author_key: logger.debug(f'Recipient is {target_author_key}') if obj.type not in DONT_NOTIFY_TYPES: if write_obj.add('notify', target_author_key): write_obj.dirty = True if obj.type == 'undo': logger.info('Object is an undo; adding targets for inner object') if set(inner_obj_as1.keys()) == {'id'}: inner_obj = from_cls.load(inner_obj_id, raise_=False) else: inner_obj = Object(id=inner_obj_id, our_as1=inner_obj_as1) if inner_obj: for target, target_obj in from_cls.targets( inner_obj, from_user=from_user, internal=True).items(): targets[target] = target_obj util.add(to_protocols, PROTOCOLS[target.protocol]) if not to_protocols: return {} logger.info(f'Direct targets: {[t.uri for t in targets.keys()]}') # deliver to followers, if appropriate user_key = from_cls.actor_key(obj, allow_opt_out=allow_opt_out) if not user_key: logger.info("Can't tell who this is from! Skipping followers.") return targets # we deliver to HAS_COPIES protocols separately, below. we assume they have # follower-independent targets. to_followers_protos = [ p for p in to_protocols if not (p.HAS_COPIES and p.DEFAULT_TARGET) and not (p.USES_OBJECT_FEED and p.LABEL not in from_user.has_object_feed_followers_on)] followers = [] is_undo_block = obj.type == 'undo' and inner_obj_as1.get('verb') == 'block' if (obj.type in ('post', 'update', 'delete', 'move', 'share', 'undo') and (not is_reply or is_self_reply) and not is_undo_block and to_followers_protos): logger.info(f'Delivering to followers of {user_key.id()} on {[p.LABEL for p in to_followers_protos]}') # query each protocol individually for proto in to_followers_protos: kind = proto._get_kind() for f in Follower.query( Follower.to == user_key, Follower.status == 'active', Follower.from_ >= ndb.Key(kind, '\x00'), Follower.from_ < ndb.Key(kind + '\x00', '\x00')): # skip protocol bot users if not Protocol.for_bridgy_subdomain(f.from_.id()): followers.append(f) logger.debug(f' loaded {len(followers)} followers') user_keys = [f.from_ for f in followers] users = [u for u in ndb.get_multi(user_keys) if u] logger.debug(f' loaded {len(users)} users') User.load_multi(users) logger.debug(f' loaded user objects') if (not followers and (util.domain_or_parent_in(from_user.key.id(), LIMITED_DOMAINS) or util.domain_or_parent_in(obj.key.id(), LIMITED_DOMAINS))): logger.info(f'skipping, {from_user.key.id()} is on a limited domain and has no followers') return {} # add to followers' feeds, if any if not internal and obj.type in ('post', 'update', 'share'): if write_obj.type not in as1.ACTOR_TYPES: write_obj.feed = [u.key for u in users if u.USES_OBJECT_FEED] if write_obj.feed: write_obj.dirty = True # collect targets for followers target_obj = (original_objs.get(inner_obj_id) if obj.type == 'share' else None) for user in users: if user.is_blocking(from_user): logger.debug(f' {user.key.id()} blocks {from_user.key.id()}') continue # TODO: should we pass remote=False through here to Protocol.load? target = user.target_for(user.obj, shared=True) if user.obj else None if not target: continue target = util.normalize_url(target, trailing_slash=False) targets[Target(protocol=user.LABEL, uri=target)] = target_obj logger.debug(f' collected {len(targets)} targets') # deliver to enabled HAS_COPIES protocols proactively if obj.type in ('post', 'update', 'delete', 'share'): for proto in to_protocols: if proto.HAS_COPIES and proto.DEFAULT_TARGET: logger.info(f'user has {proto.LABEL} enabled, adding {proto.DEFAULT_TARGET}') targets.setdefault( Target(protocol=proto.LABEL, uri=proto.DEFAULT_TARGET), None) # maps string target URL to (Target, Object) tuple candidates = {t.uri: (t, obj) for t, obj in targets.items()} # maps Target to Object or None targets = {} source_domains = [ util.domain_from_link(url) for url in (obj.as1.get('id'), obj.as1.get('url'), as1.get_owner(obj.as1)) if util.is_web(url) ] for url in sorted(util.dedupe_urls( candidates.keys(), # preserve our PDS URL without trailing slash in path # https://atproto.com/specs/did#did-documents trailing_slash=False)): if util.is_web(url) and util.domain_from_link(url) in source_domains: logger.info(f'Skipping same-domain target {url}') continue elif from_user.is_blocking(url): logger.debug(f'{from_user.key.id()} blocks {url}') continue target, obj = candidates[url] targets[target] = obj return targets
[docs] @classmethod def load(cls, id, remote=None, local=True, raise_=True, raw=False, csv=False, **kwargs): """Loads and returns an Object from datastore or HTTP fetch. Sets the :attr:`new` and :attr:`changed` attributes if we know either one for the loaded object, ie local is True and remote is True or None. Args: id (str) remote (bool): whether to fetch the object over the network. If True, fetches even if we already have the object stored, and updates our stored copy. If False and we don't have the object stored, returns None. Default (None) means to fetch over the network only if we don't already have it stored. local (bool): whether to load from the datastore before fetching over the network. If False, still stores back to the datastore after a successful remote fetch. raise_ (bool): if False, catches any :class:`request.RequestException` or :class:`HTTPException` raised by :meth:`fetch()` and returns ``None`` instead raw (bool): whether to load this as a "raw" id, as is, without normalizing to an on-protocol object id. Exact meaning varies by subclass. csv (bool): whether to specifically load a CSV object TODO: merge this into raw, using returned Content-Type? kwargs: passed through to :meth:`fetch()` Returns: models.Object: loaded object, or None if it isn't fetchable, eg a non-URL string for Web, or ``remote`` is False and it isn't in the datastore Raises: requests.HTTPError: anything that :meth:`fetch` raises, if ``raise_`` is True """ assert id assert local or remote is not False # logger.debug(f'Loading Object {id} local={local} remote={remote}') if not raw: id = ids.normalize_object_id(id=id, proto=cls) obj = orig_as1 = None if local: if obj := Object.get_by_id(id): if csv and not obj.is_csv: return None elif obj.as1 or obj.csv or obj.raw or obj.deleted: # logger.debug(f' {id} got from datastore') obj.new = False if remote is False: return obj elif remote is None and obj: if obj.updated < util.as_utc(util.now() - OBJECT_REFRESH_AGE): # logger.debug(f' last updated {obj.updated}, refreshing') pass else: return obj if obj: orig_as1 = obj.as1 obj.our_as1 = None obj.new = False else: if cls == Protocol: return None obj = Object(id=id) if local: # logger.debug(f' {id} not in datastore') obj.new = True obj.changed = False try: fetched = cls.fetch(obj, csv=csv, **kwargs) except (RequestException, HTTPException, InvalidStatus) as e: if raise_: raise util.interpret_http_exception(e) return None if not fetched: return None elif csv and not obj.is_csv: return None # https://stackoverflow.com/a/3042250/186123 size = len(_entity_to_protobuf(obj)._pb.SerializeToString()) if size > MAX_ENTITY_SIZE: logger.warning(f'Object is too big! {size} bytes is over {MAX_ENTITY_SIZE}') return None obj.resolve_ids() obj.normalize_ids() if obj.new is False: obj.changed = obj.activity_changed(orig_as1) if obj.source_protocol not in (cls.LABEL, cls.ABBREV): if obj.source_protocol: logger.warning(f'Object {obj.key.id()} changed protocol from {obj.source_protocol} to {cls.LABEL} ?!') obj.source_protocol = cls.LABEL obj.put() return obj
[docs] @classmethod def check_supported(cls, obj, direction): """If this protocol doesn't support this activity, raises HTTP 204. Also reports an error. (This logic is duplicated in some protocols, eg ActivityPub, so that they can short circuit out early. It generally uses their native formats instead of AS1, before an :class:`models.Object` is created.) Args: obj (Object) direction (str): ``'receive'`` or ``'send'`` Raises: werkzeug.HTTPException: if this protocol doesn't support this object """ assert direction in ('receive', 'send') if not obj.type: return inner = as1.get_object(obj.as1) inner_type = as1.object_type(inner) or '' if (obj.type not in cls.SUPPORTED_AS1_TYPES or (obj.type in as1.CRUD_VERBS and inner_type and inner_type not in cls.SUPPORTED_AS1_TYPES)): error(f"Bridgy Fed for {cls.LABEL} doesn't support {obj.type} {inner_type} yet", status=204) # don't allow posts with blank content and no image/video/audio crud_obj = (as1.get_object(obj.as1) if obj.type in ('post', 'update') else obj.as1) if (crud_obj.get('objectType') in as1.POST_TYPES and not util.get_url(crud_obj, key='image') and not any(util.get_urls(crud_obj, 'attachments', inner_key='stream')) # TODO: handle articles with displayName but not content and not source.html_to_text(crud_obj.get('content')).strip()): error('Blank content and no image or video or audio', status=204) # receiving DMs is only allowed to protocol bot accounts if direction == 'receive': if recip := as1.recipient_if_dm(obj.as1): owner = as1.get_owner(obj.as1) if (not cls.SUPPORTS_DMS or (recip not in common.bot_user_ids() and owner not in common.bot_user_ids())): # reply and say DMs aren't supported from_proto = obj.owner_protocol() to_proto = Protocol.for_id(recip) if owner and from_proto and to_proto: if ((from_user := from_proto.get_or_create(id=owner)) and (to_user := to_proto.get_or_create(id=recip))): in_reply_to = (inner.get('id') if obj.type == 'post' else obj.as1.get('id')) text = f"Hi! Sorry, this account is bridged from {to_user.PHRASE}, so it doesn't support DMs. Try getting in touch another way!" type = f'dms_not_supported-{to_user.key.id()}' dms.maybe_send(from_=to_user, to_user=from_user, text=text, type=type, in_reply_to=in_reply_to) error("Bridgy Fed doesn't support DMs", status=204) # check that this activity is public. only do this for some activities, # not eg likes or follows, since Mastodon doesn't currently mark those # as explicitly public. elif (obj.type in set(('post', 'update')) | as1.POST_TYPES | as1.ACTOR_TYPES and not util.domain_or_parent_in(crud_obj.get('id'), NON_PUBLIC_DOMAINS) and not as1.is_public(obj.as1, unlisted=False)): error('Bridgy Fed only supports public activities', status=204)
[docs] @classmethod def block(cls, from_user, arg): """Blocks a user or list. Args: from_user (models.User): user doing the blocking arg (str): handle or id of user/list to block Returns: models.User or models.Object: user or list that was blocked Raises: ValueError: if arg doesn't look like a user or list on this protocol """ logger.info(f'user {from_user.key.id()} trying to block {arg}') def fail(msg): logger.warning(msg) raise ValueError(msg) blockee = None try: # first, try interpreting as a user handle or id blockee = load_user(arg, proto=cls, create=True, allow_opt_out=True) except (AssertionError, AttributeError, BadRequest, RuntimeError, ValueError) as err: logger.info(err) if type(from_user) == type(blockee): fail(f'{blockee.html_link()} is on {from_user.PHRASE}! Try blocking them there.') # may not be a user, see if it's a list if not blockee: if not cls or cls == Protocol: cls = Protocol.for_id(arg) if cls and (blockee := cls.load(arg)) and blockee.type == 'collection': if blockee.source_protocol == from_user.LABEL: fail(f'{blockee.html_link()} is on {from_user.PHRASE}! Try blocking it there.') else: if blocklist := from_user.add_domain_blocklist(arg): return blocklist fail(f"{arg} doesn't look like a user or list{' on ' + cls.PHRASE if cls else ''}, or we couldn't fetch it") logger.info(f' blocking {blockee.key.id()}') id = f'{from_user.key.id()}#bridgy-fed-block-{util.now().isoformat()}' obj = Object(id=id, source_protocol=from_user.LABEL, our_as1={ 'objectType': 'activity', 'verb': 'block', 'id': id, 'actor': from_user.key.id(), 'object': blockee.key.id(), }) obj.put() from_user.deliver(obj, from_user=from_user) return blockee
[docs] @classmethod def unblock(cls, from_user, arg): """Unblocks a user or list. Args: from_user (models.User): user doing the unblocking arg (str): handle or id of user/list to unblock Returns: models.User or models.Object: user or list that was unblocked Raises: ValueError: if arg doesn't look like a user or list on this protocol """ logger.info(f'user {from_user.key.id()} trying to unblock {arg}') def fail(msg): logger.warning(msg) raise ValueError(msg) blockee = None try: # first, try interpreting as a user handle or id blockee = load_user(arg, cls, create=True, allow_opt_out=True) except (AssertionError, AttributeError, BadRequest, RuntimeError, ValueError) as err: logger.info(err) if type(from_user) == type(blockee): fail(f'{blockee.html_link()} is on {from_user.PHRASE}! Try unblocking them there.') # may not be a user, see if it's a list if not blockee: if not cls or cls == Protocol: cls = Protocol.for_id(arg) if cls and (blockee := cls.load(arg)) and blockee.type == 'collection': if blockee.source_protocol == from_user.LABEL: fail(f'{blockee.html_link()} is on {from_user.PHRASE}! Try blocking it there.') else: if blocklist := from_user.remove_domain_blocklist(arg): return blocklist fail(f"{arg} doesn't look like a user or list{' on ' + cls.PHRASE if cls else ''}, or we couldn't fetch it") logger.info(f' unblocking {blockee.key.id()}') id = f'{from_user.key.id()}#bridgy-fed-unblock-{util.now().isoformat()}' obj = Object(id=id, source_protocol=from_user.LABEL, our_as1={ 'objectType': 'activity', 'verb': 'undo', 'id': id, 'actor': from_user.key.id(), 'object': { 'objectType': 'activity', 'verb': 'block', 'actor': from_user.key.id(), 'object': blockee.key.id(), }, }) obj.put() from_user.deliver(obj, from_user=from_user) return blockee
[docs] @cloud_tasks_only(log=None) def receive_task(): """Task handler for a newly received :class:`models.Object`. Calls :meth:`Protocol.receive` with the form parameters. Parameters: authed_as (str): passed to :meth:`Protocol.receive` obj_id (str): key id of :class:`models.Object` to handle received_at (str, ISO 8601 timestamp): when we first saw (received) this activity *: If ``obj_id`` is unset, all other parameters are properties for a new :class:`models.Object` to handle TODO: migrate incoming webmentions to this. See how we did it for AP. The difficulty is that parts of :meth:`protocol.Protocol.receive` depend on setup in :func:`web.webmention`, eg :class:`models.Object` with ``new`` and ``changed``, HTTP request details, etc. See stash for attempt at this for :class:`web.Web`. """ common.log_request() form = request.form.to_dict() authed_as = form.pop('authed_as', None) internal = authed_as == PRIMARY_DOMAIN or authed_as in PROTOCOL_DOMAINS obj = Object.from_request() assert obj assert obj.source_protocol obj.new = True if received_at := form.pop('received_at', None): received_at = datetime.fromisoformat(received_at) try: return PROTOCOLS[obj.source_protocol].receive( obj=obj, authed_as=authed_as, internal=internal, received_at=received_at) except RequestException as e: util.interpret_http_exception(e) error(e, status=304) except (RuntimeError, ValueError) as e: logger.warning(e, exc_info=True) error(e, status=304)
[docs] @cloud_tasks_only(log=None) def send_task(): """Task handler for sending an activity to a single specific destination. Calls :meth:`Protocol.send` with the form parameters. Parameters: protocol (str): :class:`Protocol` to send to url (str): destination URL to send to obj_id (str): key id of :class:`models.Object` to send orig_obj_id (str): optional, :class:`models.Object` key id of the "original object" that this object refers to, eg replies to or reposts or likes user (url-safe google.cloud.ndb.key.Key): :class:`models.User` (actor) this activity is from *: If ``obj_id`` is unset, all other parameters are properties for a new :class:`models.Object` to handle """ common.log_request() # prepare form = request.form.to_dict() url = form.get('url') protocol = form.get('protocol') if not url or not protocol: logger.warning(f'Missing protocol or url; got {protocol} {url}') return '', 204 target = Target(uri=url, protocol=protocol) obj = Object.from_request() assert obj and obj.key and obj.key.id() PROTOCOLS[protocol].check_supported(obj, 'send') allow_opt_out = (obj.type == 'delete') user = None if user_key := form.get('user'): key = ndb.Key(urlsafe=user_key) # use get_by_id so that we follow use_instead user = PROTOCOLS_BY_KIND[key.kind()].get_by_id( key.id(), allow_opt_out=allow_opt_out) # send delay = '' if request.headers.get('X-AppEngine-TaskRetryCount') == '0' and obj.created: delay_s = int((util.now().replace(tzinfo=None) - obj.created).total_seconds()) delay = f'({delay_s} s behind)' logger.info(f'Sending {obj.source_protocol} {obj.type} {obj.key.id()} to {protocol} {url} {delay}') logger.debug(f' AS1: {json_dumps(obj.as1, indent=2)}') sent = None try: sent = PROTOCOLS[protocol].send(obj, url, from_user=user, orig_obj_id=form.get('orig_obj_id')) except (MemcacheServerError, MemcacheUnexpectedCloseError, MemcacheUnknownError) as e: # our memorystore instance is probably undergoing maintenance. re-enqueue # task with a delay. # https://docs.cloud.google.com/memorystore/docs/memcached/about-maintenance report_error(f'memcache error on send task, re-enqueuing in {MEMCACHE_DOWN_TASK_DELAY}: {e}') common.create_task(queue='send', delay=MEMCACHE_DOWN_TASK_DELAY, **form) sent = False except BaseException as e: code, body = util.interpret_http_exception(e) if not code and not body: raise if sent is False: logger.info(f'Failed sending!') return '', 200 if sent else 204 if sent is False else 304
[docs] @cloud_tasks_only(log=None) def user_enabled_task(): r"""Task handler for when a user enables a protocol. DMs any dormant :class:`models.Follower`\s pointing at the user to let them know the user is now bridged, so they can follow them for real, and flips those ``Follower``\s from ``dormant`` to ``inactive``. Parameters: user (url-safe google.cloud.ndb.key.Key): the :class:`models.User` who enabled bridging protocol (str): ``LABEL`` of the protocol they enabled """ common.log_request() proto = PROTOCOLS[request.form['protocol']] user = ndb.Key(urlsafe=request.form['user']).get() assert user logger.info(f'{user.key.id()} is {user.status or "ok"}') if user.status: raise ErrorButDoNotRetryTask() followers = Follower.query(Follower.to == user.key, Follower.status == 'dormant').fetch() from_users = ndb.get_multi( f.from_ for f in followers if f.from_.kind() == proto._get_kind()) for follower, from_user in zip(followers, from_users): if from_user and not from_user.status: logger.info('Updating and DMing Follower from {from_user.key.id()}') follower.status = 'inactive' follower.put() relationship = { 'bounce': ', who you originally followed before you Bounced,', 'requested': ', who you asked to bridge,', }.get(follower.reason, '') dms.maybe_send(from_=proto, to_user=from_user, text=f'<p>Hi! {user.html_link(proto=proto, proto_fallback=True)}{relationship} has bridged their account into {proto.PHRASE}. You can follow them now if you want.') return '', 200
[docs] @cloud_tasks_only(log=None) def migrate_out_task(): """Task handler for finishing a migration out. Currently, for migrating out to ATProto, uploads the user's blobs to the new PDS. Otherwise, does nothing. Parameters: user (str, url-safe ndb.Key of a User): the bridged :class:`models.User` migrating out protocol (str): destination protocol auth (optional url-safe ndb.Key of an oauth-dropins auth entity): the user's new account. For ATProto, an :class:`oauth_dropins.bluesky.BlueskyAuth`. """ from atproto import ATProto common.log_request() user = ndb.Key(urlsafe=request.form['user']).get() if not user: raise ErrorButDoNotRetryTask() if request.form['protocol'] == ATProto.LABEL: auth = ndb.Key(urlsafe=request.form['auth']).get() assert auth ATProto.migrate_out_blobs(user, auth) return '', 200