"""Datastore model classes."""
import copy
from datetime import timedelta, timezone
from functools import cached_property, lru_cache
import itertools
import json
import logging
import random
import re
from threading import Lock
from urllib.parse import quote, urlparse
import csv
import io
from arroba.util import parse_at_uri
import cachetools
from Crypto.PublicKey import RSA
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat,
)
from flask import request
from google.cloud import ndb
from google.cloud.ndb.key import _MAX_KEYPART_BYTES
from granary import as1, as2, atom, bluesky, microformats2
from granary.bluesky import BSKY_APP_URL_RE
import granary.farcaster
from granary.generated.farcaster.message_pb2 import (
Message,
MESSAGE_TYPE_USER_DATA_ADD,
)
from granary.generated.farcaster.request_response_pb2 import MessagesResponse
import granary.nostr
from granary.source import html_to_text
import humanize
from lexrpc.base import AT_URI_RE
from requests import RequestException
import secp256k1
from webutil import util
from webutil.appengine_info import DEBUG
from webutil.flask_util import error
from webutil.models import (
EncryptedProperty,
JsonProperty,
stored_value,
StringIdModel,
)
from webutil.util import ellipsize, json_dumps, json_loads
import common
from common import (
OLD_ACCOUNT_AGE,
report_error,
)
import domains
from domains import (
BLOG_REDIRECT_DOMAINS,
DOMAIN_BLOCKLIST_CANARIES,
DOMAIN_RE,
PRIMARY_DOMAIN,
PROTOCOL_DOMAINS,
unwrap,
)
import ids
import memcache
# maps string label to Protocol subclass. values are populated by ProtocolUserMeta.
# (we used to wait for ProtocolUserMeta to populate the keys as well, but that was
# awkward to use in datastore model properties with choices, below; it required
# overriding them in reset_model_properties, which was always flaky.)
PROTOCOLS = {label: None for label in (
'activitypub',
'ap',
'atproto',
'bsky',
'nostr',
'ostatus',
'web',
'webmention',
'ui',
)}
DEBUG_PROTOCOLS = (
'fa',
'fake',
'efake',
'farcaster',
'fc',
'other',
)
if DEBUG:
PROTOCOLS.update({label: None for label in DEBUG_PROTOCOLS})
# maps string kind (eg 'MagicKey') to Protocol subclass.
# populated in ProtocolUserMeta
PROTOCOLS_BY_KIND = {}
# 2048 bits makes tests slow, so use 1024 for them
KEY_BITS = 1024 if DEBUG else 2048
PAGE_SIZE = 20
# auto delete most old objects via the Object.expire property
# https://cloud.google.com/datastore/docs/ttl
#
# need to keep follows because we attach them to Followers and use them for
# unfollows
DONT_EXPIRE_OBJECT_TYPES = as1.ACTOR_TYPES
EXPIRE_LATE_OBJECT_TYPES = as1.POST_TYPES | set([
'block', 'flag', 'follow', 'like', 'share'])
OBJECT_EARLY_EXPIRE_AGE = timedelta(days=2 * 30)
OBJECT_LATE_EXPIRE_AGE = timedelta(days=6 * 30)
GET_ORIGINALS_CACHE_EXPIRATION = timedelta(days=1)
FOLLOWERS_CACHE_EXPIRATION = timedelta(hours=2)
# See https://www.cloudimage.io/
IMAGE_PROXY_URL_BASE = 'https://xaasg3w5.cloudimg.io/'
IMAGE_PROXY_DOMAINS = ('threads.net',)
# used by User.status_description. values are formatted with format(user=...)
USER_STATUS_DESCRIPTIONS = { # keep in sync with DM.type's docstring!
'moved': 'account has migrated to another account',
'no-feed-or-webmention': "web site doesn't have an RSS or Atom feed or webmention endpoint",
'nobot': "profile has 'nobot' in it",
'nobridge': "profile has 'nobridge' in it",
'no-nip05': "account's NIP-05 identifier is missing or invalid",
'no-profile': 'profile is missing or empty',
'opt-out': 'account or instance has requested to be opted out',
'over-handle-domain-limit': "handle's domain has too many users on it",
'owns-webfinger': 'web site looks like a fediverse instance because it already serves Webfinger',
'private': 'account is set as private or protected',
'requires-avatar': "account doesn't have a profile picture",
'requires-name': "account's name and username are the same",
'requires-old-account': f"account is less than {humanize.naturaldelta(OLD_ACCOUNT_AGE)} old",
'unsupported-handle-ap': f"<a href='https://fed.brid.gy/docs#fediverse-get-started'>username has characters that Bridgy Fed doesn't currently support</a>",
}
# used as the User.handle_pay_level_domain value when there are too many users on
# the pay-level domain
OVER_LIMIT = 'too-many'
logger = logging.getLogger(__name__)
[docs]
class Target(ndb.Model):
r""":class:`protocol.Protocol` + URI pairs for identifying objects.
These are currently used for:
* delivery destinations, eg ActivityPub inboxes, webmention targets, etc.
* copies of :class:`Object`\s and :class:`User`\s elsewhere,
eg ``at://`` URIs for ATProto records, nevent etc bech32-encoded Nostr ids,
ATProto user DIDs, etc.
Used in :class:`google.cloud.ndb.model.StructuredProperty`\s inside
:class:`Object` and :class:`User`; not stored as top-level entities in the
datastore.
ndb implements this by hoisting each property here into a corresponding
property on the parent entity, prefixed by the StructuredProperty name
below, eg ``delivered.uri``, ``delivered.protocol``, etc.
For repeated StructuredPropertys, the hoisted properties are all repeated on
the parent entity, and reconstructed into StructuredPropertys based on their
order.
"""
uri = ndb.StringProperty(required=True)
''
protocol = ndb.StringProperty(choices=list(PROTOCOLS.keys()), required=True)
''
def __eq__(self, other):
"""Equality excludes :class:`Key`."""
if isinstance(other, Target):
return self.uri == other.uri and self.protocol == other.protocol
def __hash__(self):
"""Allow hashing so these can be dict keys."""
return hash((self.protocol, self.uri))
[docs]
class DM(ndb.Model):
""":class:`protocol.Protocol` + type pairs for identifying sent DMs.
Used in :attr:`User.sent_dms`.
https://googleapis.dev/python/python-ndb/latest/model.html#google.cloud.ndb.model.StructuredProperty
"""
type = ndb.StringProperty(required=True)
"""Known values (keep in sync with USER_STATUS_DESCRIPTIONS, the subset for
ineligible users):
* dms_not_supported-[RECIPIENT-USER-ID]
* moved
* no-feed-or-webmention
* no-nip05
* no-profile
* opt-out
* over-handle-domain-limit
* owns-webfinger
* private
* replied_to_bridged_user
* request_bridging
* requires-avatar
* requires-name
* requires-old-account
* unsupported-handle-ap
* welcome
"""
protocol = ndb.StringProperty(choices=list(PROTOCOLS.keys()), required=True)
''
def __eq__(self, other):
"""Equality excludes :class:`Key`."""
return self.type == other.type and self.protocol == other.protocol
[docs]
class KeyPair(ndb.Model):
"""A user's public/private key pair for a single protocol.
Used in :attr:`User.keypairs` ; not stored as top-level entities in the
datastore. The private key is encrypted at rest; the public key is not.
Format per ``algorithm``:
* ``rsa``: ``private_key_bytes`` is PKCS#1 PEM
(``-----BEGIN RSA PRIVATE KEY-----``); ``public_key_bytes`` is SPKI PEM
(``-----BEGIN PUBLIC KEY-----``).
* ``secp256k1``: 32 raw bytes private, 32 raw bytes BIP-340 x-only public.
* ``ed25519``: 32 raw bytes private, 32 raw bytes public.
Details for each protocol:
* ActivityPub: RSA
* ATProto: secp256k1, with ECDSA signatures
(keypair is stored in :class:`arroba.datastore_storage.AtpRepo`, *not* here)
https://atproto.com/specs/cryptography
* Farcaster: Ed25519, with EdDSA signatures
https://docs.farcaster.xyz/reference/farcaster/intent-urls#resource-urls
* Nostr: secp256k1, with Schnorr signatures
https://github.com/nostr-protocol/nips/blob/master/01.md#events-and-signatures
"""
protocol = ndb.StringProperty(choices=list(PROTOCOLS.keys()), required=True)
''
algorithm = ndb.StringProperty(choices=('ed25519', 'rsa', 'secp256k1'),
required=True)
''
public_key_bytes = ndb.BlobProperty(required=True)
''
private_key_bytes = EncryptedProperty(required=True)
''
def __eq__(self, other):
"""Equality excludes :class:`Key`."""
if isinstance(other, KeyPair):
return (self.protocol == other.protocol
and self.algorithm == other.algorithm
and self.public_key_bytes == other.public_key_bytes
and self.private_key_bytes == other.private_key_bytes)
[docs]
def reset_protocol_properties():
"""Recreates various protocol properties to include choices from ``PROTOCOLS``."""
abbrevs = f'({"|".join(PROTOCOLS.keys())}|fed)'
domains.SUBDOMAIN_BASE_URL_RE = re.compile(
rf'^https?://({abbrevs}\.brid\.gy|localhost(:8080)?)/(convert/|r/)?({abbrevs}/)?(?P<path>.+)')
ids.COPIES_PROTOCOLS = tuple(label for label, proto in PROTOCOLS.items()
if proto and proto.HAS_COPIES)
[docs]
@lru_cache(maxsize=100000)
@memcache.memoize(expire=GET_ORIGINALS_CACHE_EXPIRATION)
def get_original_object_key(copy_id):
"""Finds the :class:`Object` with a given copy id, if any.
Note that :meth:`Object.add` also updates this function's
:func:`memcache.memoize` cache.
Args:
copy_id (str)
Returns:
google.cloud.ndb.Key or None
"""
assert copy_id
return Object.query(Object.copies.uri == copy_id).get(keys_only=True)
[docs]
@lru_cache(maxsize=100000)
@memcache.memoize(expire=GET_ORIGINALS_CACHE_EXPIRATION)
def get_original_user_key(copy_id):
"""Finds the user with a given copy id, if any.
Note that :meth:`User.add` also updates this function's
:func:`memcache.memoize` cache.
Args:
copy_id (str)
Returns:
google.cloud.ndb.Key or None
"""
assert copy_id
for proto in PROTOCOLS.values():
if proto and proto.LABEL != 'ui' and not proto.owns_id(copy_id):
if orig := proto.query(proto.copies.uri == copy_id).get(keys_only=True):
return orig
[docs]
class AddRemoveMixin:
"""Mixin class that defines the :meth:`add` and :meth:`remove` methods.
If a subclass of this mixin defines the ``GET_ORIGINAL_FN`` class-level
attribute, its memoize cache will be cleared when :meth:`remove` is called with
the ``copies`` property.
"""
lock = None
"""Synchronizes :meth:`add`, :meth:`remove`, etc."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lock = Lock()
[docs]
def add(self, prop, val):
"""Adds a value to a multiply-valued property.
Args:
prop (str)
val
Returns:
True if val was added, ie it wasn't already in prop, False otherwise
"""
with self.lock:
added = util.add(getattr(self, prop), val)
if prop == 'copies' and added:
if fn := getattr(self, 'GET_ORIGINAL_FN'):
memcache.pickle_memcache.set(memcache.memoize_key(fn, val.uri),
self.key)
return added
[docs]
def remove(self, prop, val):
"""Removes a value from a multiply-valued property.
Args:
prop (str)
val
"""
with self.lock:
existing = getattr(self, prop)
if val in existing:
existing.remove(val)
if prop == 'copies':
self.clear_get_original_cache(val.uri)
[docs]
def remove_copies_on(self, proto):
"""Removes all copies on a given protocol.
``proto.HAS_COPIES`` must be True.
Args:
proto (protocol.Protocol subclass)
"""
assert proto.HAS_COPIES
for copy in self.copies:
if copy.protocol in (proto.ABBREV, proto.LABEL):
self.remove('copies', copy)
@classmethod
def clear_get_original_cache(cls, uri):
if fn := getattr(cls, 'GET_ORIGINAL_FN'):
memcache.pickle_memcache.delete(memcache.memoize_key(fn, uri))
# WARNING: AddRemoveMixin *must* be before StringIdModel here so that its __init__
# gets called! Due to an (arguable) ndb.Model bug:
# https://github.com/googleapis/python-ndb/issues/1025
[docs]
class User(AddRemoveMixin, StringIdModel, metaclass=ProtocolUserMeta):
"""Abstract base class for a Bridgy Fed user."""
GET_ORIGINAL_FN = get_original_user_key
'used by AddRemoveMixin'
obj_key = ndb.KeyProperty(kind='Object') # user profile
''
use_instead = ndb.KeyProperty()
''
copies = ndb.StructuredProperty(Target, repeated=True)
"""Proxy copies of this user elsewhere, eg DIDs for ATProto records, bech32
npub Nostr ids, etc. Similar to ``rel-me`` links in microformats2,
``alsoKnownAs`` in DID docs (and now AS2), etc.
"""
keypairs = ndb.StructuredProperty(KeyPair, repeated=True)
"""Key pairs for this user, one per bridged protocol. Encrypted at rest."""
manual_opt_out = ndb.BooleanProperty()
"""Set to True to manually disable this user. Set to False to override spam filters and forcibly enable this user."""
enabled_protocols = ndb.StringProperty(repeated=True,
choices=list(PROTOCOLS.keys()))
"""Protocols that this user has explicitly opted into.
Protocols that don't require explicit opt in are omitted here.
"""
has_object_feed_followers_on = ndb.StringProperty(repeated=True,
choices=list(PROTOCOLS.keys()))
"""Protocol labels of protocols that use :attr:`~Protocol.USES_OBJECT_FEED` and have ever had a follower of this user."""
sent_dms = ndb.StructuredProperty(DM, repeated=True)
"""DMs that we've attempted to send to this user."""
send_notifs = ndb.StringProperty(default='all', choices=('all', 'none'))
"""Which notifications we should send this user."""
blocks = ndb.KeyProperty(kind='Object', repeated=True)
''
verified_domain = ndb.StringProperty()
"""Domain that we've verified this user owns, eg web site top-level NIP-05, etc."""
created = ndb.DateTimeProperty(auto_now_add=True)
''
updated = ndb.DateTimeProperty(auto_now=True)
''
# `existing` attr is set by get_or_create
# OLD. some stored entities still have these; do not reuse.
# direct = ndb.BooleanProperty(default=False)
# actor_as2 = JsonProperty()
# protocol-specific state
# atproto_notifs_indexed_at = ndb.TextProperty()
# atproto_feed_indexed_at = ndb.TextProperty()
# mod = ndb.StringProperty() # https://github.com/snarfed/bridgy-fed/issues/794
# public_exponent = ndb.StringProperty()
# private_exponent = ndb.StringProperty()
# nostr_key_bytes = EncryptedProperty()
def __init__(self, **kwargs):
"""Constructor.
Sets :attr:`obj` explicitly because however
:class:`google.cloud.ndb.model.Model` sets it doesn't work with
``@property`` and ``@obj.setter`` below.
"""
obj = kwargs.pop('obj', None)
super().__init__(**kwargs)
if obj:
self.obj = obj
[docs]
@classmethod
def new(cls, **kwargs):
"""Try to prevent instantiation. Use subclasses instead."""
raise NotImplementedError()
def _post_put_hook(self, future):
logger.debug(f'Wrote {self.key}')
[docs]
@classmethod
def get_by_id(cls, id, allow_opt_out=False, **kwargs):
"""Override to follow ``use_instead`` property and ``status``.
Returns None if the user is opted out.
"""
user = cls._get_by_id(id, **kwargs)
if user and user.use_instead:
logger.debug(f'{user.key} use_instead => {user.use_instead}')
user = user.use_instead.get()
if not user:
return None
if user.status and not allow_opt_out:
logger.info(f'{user.key} is {user.status}')
return None
return user
[docs]
@classmethod
def get_or_create(cls, id, propagate=False, allow_opt_out=False,
reload=False, raise_=False, **kwargs):
"""Loads and returns a :class:`User`. Creates it if necessary.
If ``allow_opt_out`` is False and ``id`` is the bridged id for a user in
another protocol, returns that user instead. Note that they'll be a
different type than ``cls``!
Not transactional because transactions don't read or write memcache. :/
Fortunately we don't really depend on atomicity for much, last writer wins
is usually fine.
Args:
propagate (bool): whether to create copies of this user in push-based
protocols, eg ATProto and Nostr.
allow_opt_out (bool): whether to allow and create the user if they're
currently opted out
reload (bool): whether to reload profile always, vs only if necessary
raise_ (bool): passed through to :meth:`User.reload_profile`. If False, and
:meth:`User.reload_profile` returns None when fetching the user's profile,
this method raises :class:`RuntimeError`
kwargs: passed through to ``cls`` constructor
Returns:
User: existing or new user, or None if the user is opted out
"""
assert cls != User
# TODO?
# id = ids.normalize_user_id(id=id, proto=cls)
user = cls.get_by_id(id, allow_opt_out=True)
if user: # existing
if reload:
user.reload_profile(gateway=True, raise_=raise_)
if user.status and not allow_opt_out:
return None
user.existing = True
# TODO: propagate more fields?
changed = False
for field in ['obj', 'obj_key', 'manual_opt_out']:
old_val = getattr(user, field, None)
new_val = kwargs.get(field)
if old_val is None and new_val is not None:
setattr(user, field, new_val)
changed = True
if enabled_protocols := kwargs.get('enabled_protocols'):
user.enabled_protocols = (set(user.enabled_protocols)
| set(enabled_protocols))
changed = True
if not propagate:
if changed:
try:
user.put()
except AssertionError as e:
logger.debug(e)
error(f'Bad {cls.__name__} id {id} : {e}')
return user
else: # new, not existing
if not allow_opt_out and (orig_key := get_original_user_key(id)):
orig = orig_key.get()
if orig.status:
return None
orig.existing = False
return orig
user = cls(id=id, **kwargs)
user.existing = False
try:
user.reload_profile(gateway=True, raise_=raise_)
except AssertionError as e:
logger.debug(e)
error(f'Bad {cls.__name__} id {id} : {e}')
if user.status and not allow_opt_out:
return None
if propagate and user.status in (None, 'private'):
for label in user.enabled_protocols + list(user.DEFAULT_ENABLED_PROTOCOLS):
proto = PROTOCOLS[label]
if proto == cls:
continue
elif proto.HAS_COPIES:
if not user.get_copy(proto) and user.is_enabled(proto):
try:
proto.create_for(user)
except (ValueError, AssertionError):
logger.info(f'failed creating {proto.LABEL} copy',
exc_info=True)
user.remove('enabled_protocols', proto.LABEL)
else:
logger.debug(f'{proto.LABEL} not enabled or user copy already exists, skipping propagate')
try:
user.put()
except AssertionError as e:
error(f'Bad {cls.__name__} id {id} : {e}')
logger.debug(('Updated ' if user.existing else 'Created new ') + str(user))
return user
@property
def obj(self):
"""Convenience accessor that loads :attr:`obj_key` from the datastore."""
if self.obj_key:
if not hasattr(self, '_obj'):
self._obj = self.obj_key.get()
return self._obj
@obj.setter
def obj(self, obj):
if obj:
assert isinstance(obj, Object)
assert obj.key
self._obj = obj
self.obj_key = obj.key
else:
self._obj = self.obj_key = None
[docs]
def delete(self, proto=None):
"""Deletes a user's bridged actors in all protocols or a specific one.
Args:
proto (Protocol): optional
"""
now = util.now().isoformat()
proto_label = proto.LABEL if proto else 'all'
delete_id = f'{self.profile_id()}#bridgy-fed-delete-user-{proto_label}-{now}'
delete = Object(id=delete_id, source_protocol=self.LABEL, our_as1={
'id': delete_id,
'objectType': 'activity',
'verb': 'delete',
'actor': self.key.id(),
'object': self.key.id(),
})
self.deliver(delete, from_user=self, to_proto=proto)
[docs]
@classmethod
def load_multi(cls, users):
"""Loads :attr:`obj` for multiple users in parallel.
Args:
users (sequence of User)
"""
objs = ndb.get_multi(u.obj_key for u in users if u.obj_key)
keys_to_objs = {o.key: o for o in objs if o}
for u in users:
u._obj = keys_to_objs.get(u.obj_key)
@ndb.ComputedProperty
def handle(self):
"""This user's unique, human-chosen handle, eg ``@me@snarfed.org``.
To be implemented by subclasses.
"""
raise NotImplementedError()
@ndb.ComputedProperty
def handle_as_domain(self):
"""This user's handle in domain-like format, via :func:`id.handle_as_domain`.
Returns:
str or None: if handle is None
"""
if not hasattr(self, '_stored_handle_as_domain'):
self._stored_handle_as_domain = stored_value(self, 'handle_as_domain')
if self.verified_domain:
return self.verified_domain
return ids.handle_as_domain(self.handle)
@ndb.ComputedProperty
def handle_pay_level_domain(self):
"""This user's handle's pay-level domain.
Pay-level domains are domains at the registrar level, usually (but not
always) one level below a TLD. For example, bar.com is the pay-level domain
for both foo.bar.com and baz.bar.com, and bbc.co.uk is the pay-level domain
for www.bbc.co.uk.
WARNING: this is only set for the first accounts created before hitting the
:attr:`Protocol.HANDLES_PER_PAY_LEVEL_DOMAIN` limit for their pay-level
domain. Accounts after that limit will have 'too-many' as their value.
"""
last_pld = stored_value(self, 'handle_pay_level_domain')
if self.handle_as_domain == self._stored_handle_as_domain:
# handle is unchanged. use our existing stored value for
# handle_pay_level_domain to avoid re-querying the datastore for other
# users on the same domain
return last_pld
if self.handle_as_domain:
if extract := domains.tldextract(self.handle_as_domain):
if cur_pld := extract.top_domain_under_public_suffix:
if cur_pld == last_pld or not self.HANDLES_PER_PAY_LEVEL_DOMAIN:
return cur_pld
num_others = self.query(User.handle_pay_level_domain == cur_pld,
User.status == None).count()
if num_others < self.HANDLES_PER_PAY_LEVEL_DOMAIN:
return cur_pld
else:
return OVER_LIMIT
@ndb.ComputedProperty
def status(self):
"""Whether this user is blocked or opted out.
Optional. See :attr:`USER_STATUS_DESCRIPTIONS` for possible values.
"""
# TODO
# if not hasattr(self, '_stored_status'):
# self._stored_status = stored_value(self, 'status')
if self.manual_opt_out:
return 'opt-out'
elif self.manual_opt_out is False:
return None
# TODO: require profile for more protocols? all?
if not self.obj or not self.obj.as1:
return None
if self.obj.as1.get('bridgeable') is False: # FEP-0036
return 'opt-out'
if self.REQUIRES_AVATAR and not self.obj.as1.get('image'):
return 'requires-avatar'
name = self.obj.as1.get('displayName')
if self.REQUIRES_NAME and (not name or name in (self.handle, self.key.id())):
return 'requires-name'
if self.REQUIRES_OLD_ACCOUNT:
if published := self.obj.as1.get('published'):
if util.now() - util.parse_iso8601(published) < OLD_ACCOUNT_AGE:
return 'requires-old-account'
# https://swicg.github.io/miscellany/#movedTo
# https://docs.joinmastodon.org/spec/activitypub/#as
if self.obj.as1.get('movedTo'):
return 'moved'
summary = html_to_text(self.obj.as1.get('summary', ''), ignore_links=True)
name = html_to_text(self.obj.as1.get('displayName', ''), ignore_links=True)
# #nobridge overrides enabled_protocols
if '#nobridge' in summary or '#nobridge' in name:
return 'nobridge'
if self.HANDLES_PER_PAY_LEVEL_DOMAIN:
# TODO
# if self._stored_status:
# self._values.pop('handle_pay_level_domain', None)
if self.handle_pay_level_domain == OVER_LIMIT:
return 'over-handle-domain-limit'
# user has explicitly opted in. should go after spam filter (REQUIRES_*)
# checks, but before is_public and #nobot
#
# !!! WARNING: keep in sync with User.enable_protocol!
if self.enabled_protocols:
return None
if not as1.is_public(self.obj.as1, unlisted=False):
return 'private'
# enabled_protocols overrides #nobot
if '#nobot' in summary or '#nobot' in name:
return 'nobot'
[docs]
def status_description(self):
"""Returns a human-readable description of this user's status.
...or None if this user's status is None, or a description isn't available.
Returns:
str
"""
if desc := USER_STATUS_DESCRIPTIONS.get(self.status):
return desc.format(user=self)
[docs]
def is_enabled(self, to_proto, explicit=False):
"""Returns True if this user is bridged to a given protocol.
Reasons this might return False:
* We haven't turned on bridging these two protocols yet.
* The user is opted out or blocked.
* The user is on a domain that's opted out or blocked.
* The from protocol requires opt in, and the user hasn't opted in.
* ``explicit`` is True, and this protocol supports ``to_proto`` by, but the user hasn't explicitly opted into it.
Args:
to_proto (Protocol subclass)
explicit (bool)
Returns:
bool:
"""
from protocol import Protocol
assert isinstance(to_proto, Protocol) or issubclass(to_proto, Protocol)
if self.__class__ == to_proto:
return True
from_label = self.LABEL
to_label = to_proto.LABEL
if bot_protocol := Protocol.for_bridgy_subdomain(self.key.id()):
return to_proto != bot_protocol
elif self.manual_opt_out:
return False
elif to_label in self.enabled_protocols:
return True
elif self.status:
return False
elif to_label in self.DEFAULT_ENABLED_PROTOCOLS and not explicit:
return True
return False
[docs]
def enable_protocol(self, to_proto):
"""Adds ``to_proto`` to :attr:`enabled_protocols`.
Also sends a welcome DM to the user (via a send task) if their protocol
supports DMs.
Args:
to_proto (:class:`protocol.Protocol` subclass)
"""
import dms
# explicit opt-in overrides some status
# !!! WARNING: keep in sync with User.status!
ineligible = """Hi! Your account isn't eligible for bridging yet because your {desc}. <a href="https://fed.brid.gy/docs#troubleshooting">More details here.</a> You can try again once that's fixed by unfollowing and re-following this account."""
if self.status and self.status not in ('nobot', 'private'):
if desc := self.status_description():
dms.maybe_send(from_=to_proto, to_user=self, type=self.status,
text=ineligible.format(desc=desc))
common.error(f'Nope, user {self.key.id()} is {self.status}', status=299)
# check that our handle is supported in this protocol
err = handle = None
try:
handle = self.handle_as(to_proto)
except ValueError as e:
err = e
if not handle:
err_text = str(err) if err else 'handle is unset'
dms.maybe_send(from_=to_proto, to_user=self,
type=f'unsupported-handle-{to_proto.ABBREV}',
text=ineligible.format(desc=err_text))
common.error(err_text, status=299)
# add to enabled_protocols in memory so that create_for (below) etc see it,
# including its effects on status, but don't store to datastore until after
# create_for in case it fails
self.add('enabled_protocols', to_proto.LABEL)
if to_proto.LABEL in ids.COPIES_PROTOCOLS:
# do this even if there's an existing copy since we might need to
# reactivate it, which create_for should do
to_proto.create_for(self)
dms.maybe_send(from_=to_proto, to_user=self, type='welcome', text=f"""Welcome to Bridgy Fed! Your account will soon be bridged to {to_proto.PHRASE} at {self.html_link(proto=to_proto, name=False)}. <a href="https://fed.brid.gy/docs">See the docs</a> and <a href="https://{PRIMARY_DOMAIN}{self.user_page_path()}">your user page</a> for more information. To disable this and delete your bridged profile, block this account.""")
self.put()
common.create_task(queue='user-enabled', user=self.key.urlsafe(),
protocol=to_proto.LABEL)
logger.info(f'Enabled {to_proto.LABEL} for {self.key.id()}')
[docs]
def disable_protocol(self, to_proto):
"""Removes ``to_proto` from :attr:`enabled_protocols``.
Args:
to_proto (:class:`protocol.Protocol` subclass)
"""
self.remove('enabled_protocols', to_proto.LABEL)
self.put()
msg = f'Disabled {to_proto.LABEL} for {self.key.id()} : {self.user_page_path()}'
logger.info(msg)
[docs]
def handle_as(self, to_proto, short=False):
"""Returns this user's handle in a different protocol.
Args:
to_proto (str or Protocol)
short (bool): whether to return the full handle or a shortened form.
Default False. Currently only affects ActivityPub; returns just
``@[user]`` instead of ``@[user]@[domain]``
Returns:
str:
"""
if isinstance(to_proto, str):
to_proto = PROTOCOLS[to_proto]
# override to-ATProto to use custom domain handle in DID doc
from atproto import ATProto, did_to_handle
if to_proto == ATProto:
if did := self.get_copy(ATProto):
if handle := did_to_handle(did, remote=False):
return handle
# override web users to always use domain instead of custom username
# TODO: fall back to id if handle is unset?
handle = self.key.id() if self.LABEL == 'web' else self.handle
if not handle:
return None
return ids.translate_handle(handle=handle, from_=self.__class__,
to=to_proto, short=short)
[docs]
def id_as(self, to_proto):
"""Returns this user's id in a different protocol.
Args:
to_proto (str or Protocol)
Returns:
str
"""
if isinstance(to_proto, str):
to_proto = PROTOCOLS[to_proto]
return ids.translate_user_id(id=self.key.id(), from_=self.__class__,
to=to_proto)
[docs]
def handle_or_id(self):
"""Returns handle if we know it, otherwise id."""
return self.handle or self.key.id()
def _keypair(self, protocol):
"""Returns this user's :class:`KeyPair` for ``protocol``, or None."""
for kp in self.keypairs:
if kp.protocol == protocol:
return kp
[docs]
@memcache.memoize(key=lambda self: self.key.id())
def public_pem(self):
"""Returns the user's PEM-encoded ActivityPub public RSA key.
Returns:
bytes:
"""
self._maybe_generate_ap_key()
return self._keypair('activitypub').public_key_bytes
[docs]
@memcache.memoize(key=lambda self: self.key.id())
def private_pem(self):
"""Returns the user's PEM-encoded ActivityPub private RSA key.
Returns:
bytes:
"""
self._maybe_generate_ap_key()
return self._keypair('activitypub').private_key_bytes
def _maybe_generate_ap_key(self):
"""Generates this user's ActivityPub private key if necessary."""
if self._keypair('activitypub'):
return
logger.info(f'generating AP keypair for {self.key.id()}')
key = RSA.generate(KEY_BITS, randfunc=random.randbytes if DEBUG else None)
self.keypairs.append(KeyPair(
protocol='activitypub', algorithm='rsa',
public_key_bytes=key.publickey().exportKey(format='PEM'),
private_key_bytes=key.exportKey(format='PEM'),
))
self.put()
[docs]
def nsec(self):
"""Returns the user's bech32-encoded Nostr private secp256k1 key.
Returns:
str:
"""
self._maybe_generate_nostr_key()
privkey = secp256k1.PrivateKey(
self._keypair('nostr').private_key_bytes, raw=True)
return granary.nostr.bech32_encode('nsec', privkey.serialize())
[docs]
def hex_pubkey(self):
"""Returns the user's hex-encoded Nostr public secp256k1 key.
Returns:
str:
"""
self._maybe_generate_nostr_key()
return self._keypair('nostr').public_key_bytes.hex()
[docs]
def npub(self):
"""Returns the user's bech32-encoded ActivityPub public secp256k1 key.
Returns:
str:
"""
return granary.nostr.bech32_encode('npub', self.hex_pubkey())
def _maybe_generate_nostr_key(self):
"""Generates this user's Nostr private key if necessary."""
if self._keypair('nostr'):
return
logger.info(f'generating Nostr keypair for {self.key.id()}')
priv = secp256k1.PrivateKey()
pub_hex = granary.nostr.pubkey_from_privkey(priv.private_key.hex())
self.keypairs.append(KeyPair(
protocol='nostr', algorithm='secp256k1',
public_key_bytes=bytes.fromhex(pub_hex),
private_key_bytes=priv.private_key,
))
self.put()
[docs]
def farcaster_key(self):
"""Returns the user's Farcaster signing key.
TODO: real per-user signer keys, registered on-chain via the
KeyRegistry. Messages signed with these stub keys will be rejected by
the hub.
Returns:
cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey:
"""
self._maybe_generate_farcaster_key()
return Ed25519PrivateKey.from_private_bytes(
self._keypair('farcaster').private_key_bytes)
def _maybe_generate_farcaster_key(self):
"""Generates this user's Farcaster Ed25519 keypair if necessary."""
if self._keypair('farcaster'):
return
logger.info(f'generating Farcaster keypair for {self.key.id()}')
priv = Ed25519PrivateKey.generate()
self.keypairs.append(KeyPair(
protocol='farcaster', algorithm='ed25519',
public_key_bytes=priv.public_key().public_bytes(
Encoding.Raw, PublicFormat.Raw),
private_key_bytes=priv.private_bytes(
Encoding.Raw, PrivateFormat.Raw, NoEncryption()),
))
self.put()
[docs]
def name(self):
"""Returns this user's human-readable name, eg ``Ryan Barrett``."""
if self.obj and self.obj.as1:
if name := self.obj.as1.get('displayName'):
return name
return self.handle_or_id()
[docs]
def web_url(self):
"""Returns this user's user-facing profile page URL.
...eg ``https://bsky.app/profile/snarfed.org`` or ``https://foo.com/``.
To be implemented by subclasses.
Returns:
str
"""
raise NotImplementedError()
[docs]
def is_web_url(self, url, ignore_www=False):
"""Returns True if the given URL is this user's web URL (homepage).
Args:
url (str)
ignore_www (bool): if True, ignores ``www.`` subdomains
Returns:
bool:
"""
if not url:
return False
url = url.strip().rstrip('/')
url = re.sub(r'^(https?://)www\.', r'\1', url)
parsed_url = urlparse(url)
if parsed_url.scheme not in ('http', 'https', ''):
return False
this = self.web_url().rstrip('/')
this = re.sub(r'^(https?://)www\.', r'\1', this)
parsed_this = urlparse(this)
return (url == this or url == parsed_this.netloc or
parsed_url[1:] == parsed_this[1:]) # ignore http vs https
[docs]
def id_uri(self):
"""Returns the user id as a URI.
Sometimes this is the user id itself, eg ActivityPub actor ids.
Sometimes it's a bit different, eg at://did:plc:... for ATProto user,
https://site.com for Web users.
Returns:
str
"""
return self.key.id()
[docs]
def profile_id(self):
"""Returns the id of this user's profile object in its native protocol.
Examples:
* Web: home page URL, eg ``https://me.com/``
* ActivityPub: actor URL, eg ``https://instance.com/users/me``
* ATProto: profile AT URI, eg ``at://did:plc:123/app.bsky.actor.profile/self``
Defaults to this user's key id.
Returns:
str or None:
"""
return ids.profile_id(id=self.key.id(), proto=self)
[docs]
def is_profile(self, obj):
"""Returns True if ``obj`` is this user's profile/actor, False otherwise.
Args:
obj (Object)
Returns:
bool:
"""
self_ids = [self.key.id(), self.profile_id()]
if self.obj_key:
self_ids.append(self.obj_key.id())
if obj.key and obj.key.id() in self_ids:
return True
elif obj.as1:
obj_as1 = (as1.get_object(obj.as1) if obj.as1.get('verb') in as1.CRUD_VERBS
else obj.as1)
if obj_as1.get('id') in self_ids:
return True
[docs]
def reload_profile(self, raise_=False, **kwargs):
"""Reloads this user's identity and profile from their native protocol.
Populates the reloaded profile :class:`Object` in ``self.obj``.
Args:
raise_ (bool): passed through to :meth:`Protocol.load`. If False, and
:meth:`Protocol.load` returns None when fetching the user's profile,
this method raises :class:`RuntimeError`
kwargs: passed through to :meth:`Protocol.load`
Raises:
RuntimeError: if the user's profile can't be loaded
"""
id = self.profile_id()
obj = self.load(id, remote=True, raise_=raise_, **kwargs)
if obj:
if obj.type:
assert obj.type in as1.ACTOR_TYPES, obj.type
self.obj = obj
elif raise_:
raise RuntimeError(f"Couldn't load {id} on {self.PHRASE}")
# write the user so that we re-populate any computed properties
self.put()
[docs]
def user_page_path(self, rest=None, prefer_id=False):
"""Returns the user's Bridgy Fed user page path.
Args:
rest (str): additional path and/or query to add to the end
prefer_id (bool): whether to prefer to use the account's id in the path
instead of handle. Defaults to ``False``.
"""
path = f'/{self.ABBREV}/{self.key.id() if prefer_id else self.handle_or_id()}'
if rest:
if not (rest.startswith('?') or rest.startswith('/')):
path += '/'
path += rest
return path
[docs]
def get_copy(self, proto):
"""Returns the id for the copy of this user in a given protocol.
...or None if no such copy exists. If ``proto`` is this user, returns
this user's key id.
Args:
proto: :class:`Protocol` subclass
Returns:
str:
"""
# don't use isinstance because the testutil Fake protocol has subclasses
if self.LABEL == proto.LABEL:
return self.key.id()
for copy in self.copies:
if copy.protocol in (proto.LABEL, proto.ABBREV):
return copy.uri
[docs]
def html_link(self, name=True, handle=True, pictures=False, logo=None,
proto=None, proto_fallback=False):
"""Returns a pretty HTML link to the user's profile.
Can optionally include display name, handle, profile
picture, and/or link to a different protocol that they've enabled.
TODO: unify with :meth:`Object.actor_link`?
Args:
name (bool): include display name
handle (bool): True to include handle, False to exclude it, ``'short'``
to include a shortened version, if available
pictures (bool): include profile picture and protocol logo
logo (str): optional path to platform logo to show instead of the
protocol's default
proto (protocol.Protocol): link to this protocol instead of the user's
native protocol
proto_fallback (bool): if True, and ``proto`` is provided and has no
no canonical profile URL for bridged users, uses the user's profile
URL in their native protocol
"""
img = name_str = full_handle = handle_str = dot = logo_html = a_open = a_close = ''
if proto:
assert self.is_enabled(proto), f"{proto.LABEL} isn't enabled"
url = proto.bridged_web_url_for(self, fallback=proto_fallback)
else:
proto = self.__class__
url = self.web_url()
if pictures:
if logo:
logo_html = f'<img class="logo" title="{proto.__name__}" src="{logo}" /> '
else:
logo_html = f'<span class="logo" title="{proto.__name__}">{proto.LOGO_HTML or proto.LOGO_EMOJI}</span> '
if pic := self.profile_picture():
img = f'<img src="{pic}" class="profile"> '
if handle:
full_handle = self.handle_as(proto) or ''
handle_str = self.handle_as(proto, short=(handle == 'short')) or ''
if name and self.name() != full_handle:
name_str = self.name() or ''
handle_str = ellipsize(handle_str, chars=40)
if handle_str and name_str:
dot = ' · '
if url:
a_open = f'<a class="h-card u-author mention" rel="me" href="{url}" title="{name_str}{dot}{full_handle}">'
a_close = '</a>'
name_html = f'<span style="unicode-bidi: isolate">{ellipsize(name_str, chars=40)}</span>' if name_str else ''
return f'{logo_html}{a_open}{img}{name_html}{dot}{handle_str}{a_close}'
[docs]
def profile_picture(self):
"""Returns the user's profile picture image URL, if available, or None."""
if self.obj and self.obj.as1:
return util.get_url(self.obj.as1, 'image')
# can't use functools.lru_cache here because we want the cache key to be
# just the user id, not the whole entity
[docs]
@cachetools.cached(
cachetools.TTLCache(50000, FOLLOWERS_CACHE_EXPIRATION.total_seconds()),
key=lambda user: user.key.id(), lock=Lock())
@memcache.memoize(key=lambda self: self.key.id(),
expire=FOLLOWERS_CACHE_EXPIRATION)
def count_followers(self):
"""Counts this user's followers and followings.
Returns:
(int, int) tuple: (number of followers, number following)
"""
if self.key.id() in PROTOCOL_DOMAINS:
# we don't store Followers for protocol bot users any more, so
# follower counts are inaccurate, so don't return them
return (0, 0)
num_followers = Follower.query(Follower.to == self.key,
Follower.status == 'active')\
.count_async()
num_following = Follower.query(Follower.from_ == self.key,
Follower.status == 'active')\
.count_async()
return num_followers.get_result(), num_following.get_result()
[docs]
def is_blocking(self, user_or_id):
"""Returns True if this user is is blocking ``user_or_id``, False otherwise.
Looks at domain blocklists in :attr:`blocks`. Eventually we can add support
for blocking individual users in that too.
Args:
user_or_id (User or str)
Returns:
bool:
"""
if not user_or_id or not (isinstance(user_or_id, User)
or util.is_url(user_or_id)
or DOMAIN_RE.fullmatch(user_or_id)):
return False
blocklists = ndb.get_multi(key for key in self.blocks
if key.kind() == 'Object')
for list in blocklists:
if list.domain_blocklist_matches(user_or_id):
logger.info(f'{self.key.id()} is blocking {user_or_id}')
return True
[docs]
def add_domain_blocklist(self, url):
"""Adds a domain blocklist to this user.
Loads the CSV at the given URL adds it to :attr:`blocks` if it's
not already there.
Args:
url (str): URL of CSV blocklist to add
Returns:
Object: CSV blocklist, or None if it couldn't be loaded
"""
from web import Web
key = Object(id=maybe_truncate_key_id(url)).key
if key in self.blocks:
return key.get()
if obj := Web.load(url, csv=True):
self.blocks.append(obj.key)
self.put()
return obj
[docs]
def remove_domain_blocklist(self, url):
"""Removes a domain blocklist from this user.
Args:
url (str): URL of CSV blocklist to remove
Returns:
Object: CSV blocklist, or None if it couldn't be loaded
"""
from web import Web
key = Object(id=maybe_truncate_key_id(url)).key
if key in self.blocks:
self.blocks.remove(key)
self.put()
return key.get()
if obj := Web.load(url, csv=True):
return obj
# WARNING: AddRemoveMixin *must* be before StringIdModel here so that its __init__
# gets called! Due to an (arguable) ndb.Model bug:
# https://github.com/googleapis/python-ndb/issues/1025
[docs]
class Object(AddRemoveMixin, StringIdModel):
"""An activity or other object, eg actor.
Key name is the id, generally a URI. We synthesize ids if necessary.
"""
GET_ORIGINAL_FN = get_original_object_key
'used by AddRemoveMixin'
users = ndb.KeyProperty(repeated=True)
'User(s) who created or otherwise own this object.'
notify = ndb.KeyProperty(repeated=True)
"""User who should see this in their user page, eg in reply to, reaction to,
share of, etc.
"""
feed = ndb.KeyProperty(repeated=True)
'User who should see this in their feeds, eg followers of its creator'
source_protocol = ndb.StringProperty(choices=list(PROTOCOLS.keys()))
"""The protocol this object originally came from.
TODO: nail down whether this is :attr:`ABBREV`` or :attr:`LABEL`
"""
# TODO: switch back to ndb.JsonProperty if/when they fix it for the web console
# https://github.com/googleapis/python-ndb/issues/874
as2 = JsonProperty()
'ActivityStreams 2, for ActivityPub'
bsky = JsonProperty()
'AT Protocol lexicon, for Bluesky'
csv = ndb.TextProperty()
'Other standalone CSV data, eg domain blocklist.'
farcaster = ndb.BlobProperty(repeated=True)
"""List of binary serialized Farcaster :class:`Message` protobufs.
Each blob is ``SerializeToString()`` and decodes via ``Message.FromString(blob)``.
Repeated to support actors as multiple ``USER_DATA_ADD`` messages.
"""
mf2 = JsonProperty()
'HTML microformats2 item (*not* top level parse object with ``items`` field)'
nostr = JsonProperty()
'Nostr event'
our_as1 = JsonProperty()
'ActivityStreams 1, for activities that we generate or modify ourselves'
raw = JsonProperty()
'Other standalone data format, eg DID document'
extra_as1 = JsonProperty()
"Additional individual fields to merge into this object's AS1 representation"
is_csv = ndb.BooleanProperty()
"Whether this object is a CSV. Needed because :attr:`csv` isn't indexed."
# TODO: remove and actually delete Objects instead!
deleted = ndb.BooleanProperty()
''
copies = ndb.StructuredProperty(Target, repeated=True)
"""Copies of this object elsewhere, eg at:// URIs for ATProto records and
nevent etc bech32-encoded Nostr ids, where this object is the original.
Similar to u-syndication links in microformats2 and
upstream/downstreamDuplicates in AS1.
"""
created = ndb.DateTimeProperty(auto_now_add=True)
''
updated = ndb.DateTimeProperty(auto_now=True)
''
new = None
"""True if this object is new, ie this is the first time we've seen it,
False otherwise, None if we don't know.
"""
changed = None
"""True if this object's contents have changed from our existing copy in the
datastore, False otherwise, None if we don't know. :class:`Object` is
new/changed. See :meth:`activity_changed()` for more details.
"""
# DEPRECATED
# These were for full feeds with multiple items, not just this one, so they were
# stored as audit records only, not used in to_as1. for Atom/RSS
# based Objects, our_as1 was populated with an feed_index top-level
# integer field that indexed into one of these.
#
# atom = ndb.TextProperty() # Atom XML
# rss = ndb.TextProperty() # RSS XML
# DEPRECATED; these were for delivery tracking, but they were too expensive,
# so we stopped: https://github.com/snarfed/bridgy-fed/issues/1501
#
# STATUSES = ('new', 'in progress', 'complete', 'failed', 'ignored')
# status = ndb.StringProperty(choices=STATUSES)
# delivered = ndb.StructuredProperty(Target, repeated=True)
# undelivered = ndb.StructuredProperty(Target, repeated=True)
# failed = ndb.StructuredProperty(Target, repeated=True)
# DEPRECATED but still used read only to maintain backward compatibility
# with old Objects in the datastore that we haven't bothered migrating.
#
# domains = ndb.StringProperty(repeated=True)
# DEPRECATED; replaced by :attr:`users`, :attr:`notify`, :attr:`feed`
#
# labels = ndb.StringProperty(repeated=True,
# choices=('activity', 'feed', 'notification', 'user'))
@property
def as1(self):
from protocol import Protocol
def use_urls_as_ids(obj):
"""If id field is missing or not a URL, use the url field."""
id = obj.get('id')
if not id or not (util.is_web(id) or DOMAIN_RE.fullmatch(id)):
if url := util.get_url(obj):
obj['id'] = url
for field in 'author', 'actor', 'object':
if inner := as1.get_object(obj, field):
use_urls_as_ids(inner)
if self.our_as1:
obj = self.our_as1
if self.source_protocol == 'web':
use_urls_as_ids(obj)
elif self.as2:
obj = as2.to_as1(unwrap(self.as2))
elif self.bsky:
owner, _, _ = parse_at_uri(self.key.id())
ATProto = PROTOCOLS['atproto']
handle = ATProto(id=owner).handle
try:
obj = bluesky.to_as1(self.bsky, repo_did=owner, repo_handle=handle,
uri=self.key.id(), pds=ATProto.pds_for(self))
except (ValueError, RequestException):
logger.info(f"Couldn't convert to AS1", exc_info=True)
return None
elif self.mf2:
obj = microformats2.json_to_object(self.mf2,
rel_urls=self.mf2.get('rel-urls'))
use_urls_as_ids(obj)
# use fetched final URL as id, not u-url
# https://github.com/snarfed/bridgy-fed/issues/829
if url := self.mf2.get('url'):
obj['id'] = (self.key.id() if self.key and '#' in self.key.id()
else url)
if self.key and (proto := Protocol.for_bridgy_subdomain(self.key.id())):
if util.domain_or_parent_in(as1.get_owner(obj), BLOG_REDIRECT_DOMAINS):
logger.debug(f'overriding actor/author with {proto.bot_user_id()}')
obj['actor'] = obj['author'] = proto.bot_user_id()
if util.domain_or_parent_in(obj.get('id'), BLOG_REDIRECT_DOMAINS):
logger.debug(f'overriding id/url with {self.key.id()}')
obj['id'] = obj['url'] = self.key.id()
elif self.nostr:
obj = granary.nostr.to_as1(self.nostr)
elif self.farcaster:
msgs = [Message.FromString(b) for b in self.farcaster]
obj = granary.farcaster.to_as1(MessagesResponse(messages=msgs)
if len(msgs) > 1 else msgs[0])
else:
return None
# populate id if necessary
if self.key:
obj.setdefault('id', self.key.id())
if util.domain_or_parent_in(obj.get('id'), IMAGE_PROXY_DOMAINS):
as1.prefix_urls(obj, 'image', IMAGE_PROXY_URL_BASE)
if self.extra_as1:
obj.update(self.extra_as1)
return obj
@ndb.ComputedProperty
def type(self): # AS1 objectType, or verb if it's an activity
if self.as1:
return as1.object_type(self.as1)
def _expire(self):
"""Automatically delete most Objects after a while using a TTL policy.
https://cloud.google.com/datastore/docs/ttl
They recommend not indexing TTL properties:
https://cloud.google.com/datastore/docs/ttl#ttl_properties_and_indexes
"""
now = self.updated or util.now()
if self.deleted:
return now + timedelta(days=1)
elif (self.key.id().startswith('internal:') or self.raw or self.is_csv
or self.type in DONT_EXPIRE_OBJECT_TYPES or self.copies):
return None
elif self.type in EXPIRE_LATE_OBJECT_TYPES:
return now + OBJECT_LATE_EXPIRE_AGE
return now + OBJECT_EARLY_EXPIRE_AGE
expire = ndb.ComputedProperty(_expire, indexed=False)
def _pre_put_hook(self):
"""
* Validate that at:// URIs have DIDs
* Validate that Nostr ids are nostr:[hex] ids
* Validate that multi-element farcaster lists are all USER_DATA_ADD messages
* Set/remove the activity label
* Strip @context from as2 (we don't do LD) to save disk space
"""
if len(self.farcaster) > 1:
for msg in self.farcaster:
data = Message.FromString(msg).data
assert data.type == MESSAGE_TYPE_USER_DATA_ADD, f'multi-element farcaster lists must be all USER_DATA_ADD messages; got {data}'
if self.as2:
self.as2.pop('@context', None)
for field in 'actor', 'attributedTo', 'author', 'object':
for val in util.get_list(self.as2, field):
if isinstance(val, dict):
val.pop('@context', None)
def check_id(id, proto):
if proto in (None, 'ui'):
return
assert PROTOCOLS[proto].owns_id(id) is not False, \
f'Protocol {PROTOCOLS[proto].LABEL} does not own id {id}'
if proto == 'nostr':
assert id.startswith('nostr:'), id
assert granary.nostr.ID_RE.match(id.removeprefix('nostr:')), id
elif proto == 'atproto':
assert id.startswith('at://') or id.startswith('did:'), id
if id.startswith('at://'):
repo, _, _ = parse_at_uri(id)
if not repo.startswith('did:'):
# TODO: if we hit this, that means the AppView gave us an AT
# URI with a handle repo/authority instead of DID. that's
# surprising! ...if so, and if we need to handle it, add a
# new arroba.did.canonicalize_at_uri() function, then use it
# here, or before.
raise ValueError(f'at:// URI ids must have DID repos; got {id}')
check_id(self.key.id(), self.source_protocol)
for target in self.copies:
check_id(target.uri, target.protocol)
def _post_put_hook(self, future):
# TODO: assert that as1 id is same as key id? in pre put hook?
logger.debug(f'Wrote {self.key}')
[docs]
def html_link(self):
"""Returns an HTML link to this object's user-facing web URL, if any.
Returns:
str or None:
"""
self_as1 = self.as1 or {}
if self.extra_as1:
self_as1.update(self.extra_as1)
url = self_as1.get('url') or self.key.id()
return util.pretty_link(url, text=self_as1.get('displayName'))
[docs]
@classmethod
def get_by_id(cls, id, authed_as=None, **kwargs):
"""Fetches the :class:`Object` with the given id, if it exists.
Args:
id (str)
authed_as (str): optional; if provided, and a matching :class:`Object`
already exists, its ``author`` or ``actor`` must contain this actor
id. Implements basic authorization for updates and deletes.
Returns:
Object:
Raises:
:class:`werkzeug.exceptions.Forbidden` if ``authed_as`` doesn't match
the existing object
"""
obj = super().get_by_id(maybe_truncate_key_id(id), **kwargs)
if obj and obj.as1 and authed_as:
# authorization: check that the authed user is allowed to modify
# this object
# https://www.w3.org/wiki/ActivityPub/Primer/Authentication_Authorization
proto = obj.owner_protocol()
assert proto, obj.source_protocol
owners = [ids.normalize_user_id(id=owner, proto=proto)
for owner in (as1.get_ids(obj.as1, 'author')
+ as1.get_ids(obj.as1, 'actor'))
if owner]
if obj.type in as1.ACTOR_TYPES:
owners.append(id)
user_id = ids.normalize_user_id(id=authed_as, proto=proto)
profile_id = ids.profile_id(id=authed_as, proto=proto)
if (owners and user_id not in owners and profile_id not in owners
and authed_as not in (PRIMARY_DOMAIN,) + PROTOCOL_DOMAINS):
report_error("Auth: Object: authed_as doesn't match owner",
user=f'{user_id} {profile_id} authed_as {authed_as} owners {owners}')
error(f"authed user {authed_as} ({user_id} {profile_id}) isn't object owner {owners}",
status=403)
return obj
[docs]
@classmethod
def get_or_create(cls, id, authed_as=None, **props):
"""Returns an :class:`Object` with the given property values.
If a matching :class:`Object` doesn't exist in the datastore, creates it
first. Only populates non-False/empty property values in props into the
object. Also populates the :attr:`new` and :attr:`changed` properties.
Not transactional because transactions don't read or write memcache. :/
Fortunately we don't really depend on atomicity for much, last writer wins
is usually fine.
Args:
authed_as (str): optional; if provided, and a matching :class:`Object`
already exists, its ``author`` or ``actor`` must contain this actor
id. Implements basic authorization for updates and deletes.
Returns:
Object:
Raises:
:class:`werkzeug.exceptions.Forbidden` if ``authed_as`` doesn't match
the existing object
"""
key_id = maybe_truncate_key_id(id)
obj = cls.get_by_id(key_id, authed_as=authed_as)
if not obj:
obj = Object(id=key_id, **props)
obj.new = True
obj.changed = False
obj.put()
return obj
if orig_as1 := obj.as1:
# get_by_id() checks authorization if authed_as is set. make sure
# it's always set for existing objects.
assert authed_as
dirty = False
for prop, val in props.items():
assert not isinstance(getattr(Object, prop), ndb.ComputedProperty)
if prop in ('copies', 'feed', 'notify', 'users'):
# merge repeated fields
for elem in val:
if obj.add(prop, elem):
dirty = True
elif val is not None and val != getattr(obj, prop):
setattr(obj, prop, val)
if (prop in ('as2', 'bsky', 'csv', 'mf2', 'nostr', 'raw')
and not props.get('our_as1')):
obj.our_as1 = None
dirty = True
obj.new = False
obj.changed = obj.activity_changed(orig_as1)
if dirty:
obj.put()
return obj
[docs]
@staticmethod
def from_request():
"""Creates and returns an :class:`Object` from form-encoded JSON parameters.
Parameters:
obj_id (str): id of :class:`models.Object` to handle
*: If ``obj_id`` is unset, all other parameters are properties for a
new :class:`models.Object` to handle
"""
if obj_id := request.form.get('obj_id'):
return Object.get_by_id(obj_id)
props = {field: request.form.get(field)
for field in ('id', 'source_protocol')}
for json_prop in 'as2', 'bsky', 'mf2', 'our_as1', 'nostr', 'raw':
if val := request.form.get(json_prop):
props[json_prop] = json_loads(val)
obj = Object(**props)
if not obj.key and obj.as1:
if id := obj.as1.get('id'):
obj.key = ndb.Key(Object, id)
return obj
[docs]
def to_request(self):
"""Returns a query parameter dict representing this :class:`Object`."""
form = {}
for json_prop in 'as2', 'bsky', 'mf2', 'nostr', 'our_as1', 'raw':
if val := getattr(self, json_prop, None):
form[json_prop] = json_dumps(val, sort_keys=True)
for prop in ['source_protocol']:
if val := getattr(self, prop):
form[prop] = val
if self.key:
form['id'] = self.key.id()
return form
[docs]
def activity_changed(self, other_as1):
"""Returns True if this activity is meaningfully changed from ``other_as1``.
...otherwise False.
Used to populate :attr:`changed`.
Args:
other_as1 (dict): AS1 object, or none
"""
# ignore inReplyTo since we translate it between protocols
return (as1.activity_changed(self.as1, other_as1, inReplyTo=False)
if self.as1 and other_as1
else bool(self.as1) != bool(other_as1))
[docs]
def actor_link(self, image=True, sized=False, user=None):
"""Returns a pretty HTML link with the actor's name and picture.
TODO: unify with :meth:`User.html_link`?
Args:
image (bool): whether to include an ``img`` tag with the actor's picture
sized (bool): whether to set an explicit (``width=32``) size on the
profile picture ``img`` tag
user (User): current user
Returns:
str:
"""
attrs = {'class': 'h-card u-author'}
if user and user.key in self.users:
# outbound; show a nice link to the user
return user.html_link(handle=False, pictures=True)
proto = self.owner_protocol()
actor = None
if self.as1:
actor = (as1.get_object(self.as1, 'actor')
or as1.get_object(self.as1, 'author'))
# hydrate from datastore if available
# TODO: optimize! this is called serially in loops, eg in home.html
if set(actor.keys()) == {'id'} and proto:
actor_obj = proto.load(actor['id'], remote=False)
if actor_obj and actor_obj.as1:
actor = actor_obj.as1
if not actor:
return ''
elif set(actor.keys()) == {'id'}:
return common.pretty_link(actor['id'], attrs=attrs, user=user)
url = as1.get_url(actor)
name = actor.get('displayName') or actor.get('username') or ''
img_url = util.get_url(actor, 'image')
if not image or not img_url:
return common.pretty_link(url, text=name, attrs=attrs, user=user)
logo = ''
if proto:
logo = f'<span class="logo" title="{self.__class__.__name__}">{proto.LOGO_HTML or proto.LOGO_EMOJI}</span>'
return f"""\
{logo}
<a class="h-card u-author" href="{url}" title="{name}">
<img class="profile" src="{img_url}" {'width="32"' if sized else ''}/>
<span style="unicode-bidi: isolate">{util.ellipsize(name, chars=40)}</span>
</a>"""
[docs]
def get_copy(self, proto):
"""Returns the id for the copy of this object in a given protocol.
...or None if no such copy exists. If ``proto`` is ``source_protocol``,
returns this object's key id.
TODO: for some protocols, we should try harder to find the *right* copy id.
Eg if if copies has some old garbage entries for this protocol, and we can
tell that they don't belong to the user's copy account in this protocol, eg
if the DID in the at:// URI doesn't match, we should skip those and look for
the matching copy. We'd need the user here though.
This would help with or fix:
https://console.cloud.google.com/errors/detail/COK22a6w4O2JVg;locations=global;time=P30D?project=bridgy-federated
Args:
proto: :class:`Protocol` subclass
Returns:
str:
"""
copies = self.get_copies(proto)
return copies[0] if copies else None
[docs]
def get_copies(self, proto):
"""Returns all ids of copies of this object in a given protocol.
If ``proto`` is ``source_protocol``, returns this object's key id.
Args:
proto: :class:`Protocol` subclass
Returns:
list of str:
"""
if self.source_protocol in (proto.LABEL, proto.ABBREV):
return [self.key.id()]
return [copy.uri for copy in self.copies
if copy.protocol in (proto.LABEL, proto.ABBREV)]
[docs]
def resolve_ids(self):
"""Replaces "copy" ids, subdomain ids, etc with their originals.
The end result is that all ids are original "source" ids, ie in the
protocol that they first came from.
Specifically, resolves:
* ids in :class:`User.copies` and :class:`Object.copies`, eg ATProto
records and Nostr events that we bridged, to the ids of their
original objects in their source protocol, eg
``at://did:plc:abc/app.bsky.feed.post/123`` => ``https://mas.to/@user/456``.
* Bridgy Fed subdomain URLs to the ids embedded inside them, eg
``https://bsky.brid.gy/ap/did:plc:xyz`` => ``did:plc:xyz``
* ATProto bsky.app URLs to their DIDs or `at://` URIs, eg
``https://bsky.app/profile/a.com`` => ``did:plc:123``
...in these AS1 fields, in place:
* ``id``
* ``actor``
* ``author``
* ``object``
* ``object.actor``
* ``object.author``
* ``object.id``
* ``object.inReplyTo``
* ``attachments.[objectType=note].id``
* ``tags.[objectType=mention].url``
:meth:`protocol.Protocol.translate_ids` is partly the inverse of this.
Much of the same logic is duplicated there!
TODO: unify with :meth:`normalize_ids`, :meth:`Object.normalize_ids`.
"""
if not self.as1:
return
# extract ids, strip Bridgy Fed subdomain URLs
outer_obj = unwrap(self.as1)
if outer_obj != self.as1:
self.our_as1 = util.trim_nulls(outer_obj)
self_proto = PROTOCOLS.get(self.source_protocol)
if not self_proto:
return
logger.debug(f'Resolving ids for {self.key.id()}')
inner_obj = outer_obj['object'] = as1.get_object(outer_obj)
replaced = False
def replace(val, orig_fn):
id = val.get('id') if isinstance(val, dict) else val
if not id or not self_proto.HAS_COPIES:
return id
orig = orig_fn(id)
if not orig:
return val
nonlocal replaced
replaced = True
logger.debug(f'Resolved copy id {val} to original {orig.id()}')
if isinstance(val, dict) and util.trim_nulls(val).keys() > {'id'}:
val['id'] = orig.id()
return val
else:
return orig.id()
# actually replace ids
#
# object field could be either object (eg repost) or actor (eg follow)
# TODO: handle better
# https://github.com/snarfed/bridgy-fed/issues/2281
outer_obj['object'] = replace(inner_obj, get_original_object_key)
if not replaced:
outer_obj['object'] = replace(inner_obj, get_original_user_key)
for obj in outer_obj, inner_obj:
for tag in as1.get_objects(obj, 'tags'):
if tag.get('objectType') == 'mention':
tag['url'] = replace(tag.get('url'), get_original_user_key)
for att in as1.get_objects(obj, 'attachments'):
if att.get('objectType') == 'note':
att['id'] = replace(att.get('id'), get_original_object_key)
for field, fn in (
('actor', get_original_user_key),
('author', get_original_user_key),
('inReplyTo', get_original_object_key),
):
obj[field] = [replace(val, fn) for val in util.get_list(obj, field)]
if len(obj[field]) == 1:
obj[field] = obj[field][0]
if replaced:
self.our_as1 = util.trim_nulls(outer_obj)
[docs]
def normalize_ids(self):
"""Normalizes ids to their protocol's canonical representation, if any.
For example, normalizes ATProto ``https://bsky.app/...`` URLs to DIDs
for profiles, ``at://`` URIs for posts.
Modifies this object in place.
TODO: unify with :meth:`resolve_ids`, :meth:`Protocol.translate_ids`.
"""
from protocol import Protocol
if not self.as1:
return
logger.debug(f'Normalizing ids for {self.key.id()}')
outer_obj = copy.deepcopy(self.as1)
inner_objs = as1.get_objects(outer_obj)
replaced = False
def replace(val, translate_fn):
nonlocal replaced
orig = val.get('id') if isinstance(val, dict) else val
if not orig:
return val
proto = Protocol.for_id(orig, remote=False)
if not proto:
return val
translated = translate_fn(id=orig, from_=proto, to=proto)
if translated and translated != orig:
# logger.debug(f'Normalized {proto.LABEL} id {orig} to {translated}')
replaced = True
if isinstance(val, dict):
val['id'] = translated
return val
else:
return translated
return val
# actually replace ids
for obj in [outer_obj] + inner_objs:
for tag in as1.get_objects(obj, 'tags'):
if tag.get('objectType') == 'mention':
tag['url'] = replace(tag.get('url'), ids.translate_user_id)
for field in ['actor', 'author', 'inReplyTo']:
fn = (ids.translate_object_id if field == 'inReplyTo'
else ids.translate_user_id)
obj[field] = [replace(val, fn) for val in util.get_list(obj, field)]
if len(obj[field]) == 1:
obj[field] = obj[field][0]
outer_obj['object'] = []
for inner_obj in inner_objs:
translate_fn = ids.translate_object_id
if as1.object_type(outer_obj) in as1.VERBS_WITH_ACTOR_OBJECT:
translate_fn = ids.translate_user_id
got = replace(inner_obj, translate_fn)
if isinstance(got, dict) and util.trim_nulls(got).keys() == {'id'}:
got = got['id']
outer_obj['object'].append(got)
if len(outer_obj['object']) == 1:
outer_obj['object'] = outer_obj['object'][0]
if replaced:
self.our_as1 = util.trim_nulls(outer_obj)
[docs]
def owner_protocol(self):
"""Wrapper around :attr:`source_protocol` that handles :class:`UIProtocol`.
Returns:
Protocol subclass: :attr:`source_protocol` *unless* it's None or
:class:`UIProtocol`, in which case infers and returns ``author``'s or
``actor``'s protocol instead.
"""
from protocol import Protocol
if self.source_protocol in (None, 'ui'):
return Protocol.for_id(as1.get_owner(self.as1))
return PROTOCOLS.get(self.source_protocol)
@cached_property
def domain_blocklist(self):
"""Returns the domains in the domain blocklist in :attr:`raw` or :attr:`csv`.
If :attr:`raw` is a list, returns it directly. Otherwise extracts the
'domain' or '#domain' column from :attr:`csv`.
TODO: unify with :meth:`filters.blocklist_items`
Returns:
list of str: domain names, or empty list if neither :attr:`raw` nor
:attr:`csv` is populated or parseable.
"""
assert not (self.raw and self.csv)
if self.raw:
return [val.split('#')[0].strip().lower() for val in self.raw]
if not self.csv:
return []
try:
reader = csv.DictReader(io.StringIO(self.csv))
except csv.Error:
return []
if 'domain' in reader.fieldnames:
col = 'domain'
elif '#domain' in reader.fieldnames:
col = '#domain'
else:
return []
return [row[col] for row in reader
if row[col] and row[col] not in DOMAIN_BLOCKLIST_CANARIES]
[docs]
def domain_blocklist_matches(self, user_or_id):
"""Returns True if ``user_or_id`` is in this domain blocklist, False otherwise.
For users, looks at id, handle, and delivery target.
Args:
user_or_id (User or str)
Returns:
bool:
Raises:
AssertionError: if this object is not a domain blocklist
"""
assert self.is_csv or self.csv or isinstance(self.raw, list)
if isinstance(user_or_id, User):
user = user_or_id
inputs = [user.key.id(), user.handle_as_domain]
if user.obj:
inputs.append(user.target_for(user.obj))
else:
inputs = [user_or_id]
for input in inputs:
if domain := util.domain_from_link(input):
if (util.domain_or_parent_in(domain, self.domain_blocklist)
and not util.domain_or_parent_in(domain, domains.DOMAINS)):
logger.info(f'{input} matches domain blocklist {self.key.id()}')
return True
[docs]
class Follower(ndb.Model):
"""A follower of a Bridgy Fed user."""
STATUSES = ('active', 'inactive', 'dormant')
REASONS = ('requested', 'bounce')
from_ = ndb.KeyProperty(name='from', required=True)
"""The follower."""
to = ndb.KeyProperty(required=True)
"""The followee, ie the user being followed."""
follow = ndb.KeyProperty(Object)
"""The last follow activity."""
status = ndb.StringProperty(choices=STATUSES, default='active')
"""Whether this follow is active or not.
``dormant`` means the followee isn't bridged (yet), so the follow can't be
delivered. If they enable the bridge, we notify the follower.
"""
reason = ndb.StringProperty(choices=REASONS)
"""Optional explanation for this follow's :attr:`status`, eg why it's
dormant. One of :attr:`REASONS`."""
created = ndb.DateTimeProperty(auto_now_add=True)
updated = ndb.DateTimeProperty(auto_now=True)
# OLD. some stored entities still have these; do not reuse.
# src = ndb.StringProperty()
# dest = ndb.StringProperty()
# last_follow = JsonProperty()
def _pre_put_hook(self):
# we're a bridge! stick with bridging.
assert self.from_.kind() != self.to.kind(), f'from {self.from_} to {self.to}'
def _post_put_hook(self, future):
logger.debug(f'Wrote {self.key}')
[docs]
@classmethod
def get_or_create(cls, *, from_, to, **kwargs):
"""Returns a Follower with the given ``from_`` and ``to`` users.
Not transactional because transactions don't read or write memcache. :/
Fortunately we don't really depend on atomicity for much, last writer wins
is usually fine.
If a matching :class:`Follower` doesn't exist in the datastore, creates
it first.
If ``status='dormant'`` is passed and the existing Follower is
``active``, the existing Follower is returned unchanged: we never
downgrade an active Follower to dormant.
Args:
from_ (User or Key)
to (User or Key)
Returns:
Follower:
"""
from_key = from_ if isinstance(from_, ndb.Key) else from_.key
to_key = to if isinstance(to, ndb.Key) else to.key
assert from_key
assert to_key
follower = Follower.query(Follower.from_ == from_key,
Follower.to == to_key,
).get()
if not follower:
follower = Follower(from_=from_key, to=to_key, **kwargs)
elif kwargs:
# update existing entity with new property values, eg to make an
# inactive Follower active again
for prop, val in kwargs.items():
if (prop == 'status' and val == 'dormant'
and follower.status == 'active'):
# don't downgrade an active Follower to dormant
continue
setattr(follower, prop, val)
follower.put()
return follower
[docs]
@staticmethod
def fetch_page(collection, user):
r"""Fetches a page of :class:`Follower`\s for a given user.
Wraps :func:`fetch_page`. Paging uses the ``before`` and ``after`` query
parameters, if available in the request.
Args:
collection (str): ``followers`` or ``following``
user (User)
Returns:
(list of Follower, str, str) tuple: results, annotated with an extra
``user`` attribute that holds the follower or following :class:`User`,
and new str query param values for ``before`` and ``after`` to fetch
the previous and next pages, respectively
"""
assert collection in ('followers', 'following'), collection
filter_prop = Follower.to if collection == 'followers' else Follower.from_
query = Follower.query(
Follower.status == 'active',
filter_prop == user.key,
)
followers, before, after = fetch_page(query, Follower, by=Follower.updated)
users = ndb.get_multi(f.from_ if collection == 'followers' else f.to
for f in followers)
User.load_multi(u for u in users if u)
for f, u in zip(followers, users):
f.user = u
followers = [f for f in followers if f.user]
# only show followers in protocols that this user is bridged into
if collection == 'followers':
followers = [f for f in followers if user.is_enabled(f.user)]
return followers, before, after
[docs]
def fetch_objects(query, by=None, user=None, max_age=None):
"""Fetches a page of :class:`Object` entities from a datastore query.
Wraps :func:`fetch_page` and adds attributes to the returned
:class:`Object` entities for rendering in ``objects.html``.
Args:
query (ndb.Query)
by (ndb.model.Property): either :attr:`Object.updated` or
:attr:`Object.created`
user (User): current user
max_age (datetime.timedelta): passed through to :func:`fetch_page`
Returns:
(list of Object, str, str) tuple:
(results, new ``before`` query param, new ``after`` query param)
to fetch the previous and next pages, respectively
"""
assert by is Object.updated or by is Object.created
objects, new_before, new_after = fetch_page(query, Object, by=by,
max_age=max_age)
objects = [o for o in objects if as1.is_public(o.as1) and not o.deleted]
# synthesize human-friendly content for objects
for i, obj in enumerate(objects):
obj_as1 = obj.as1
type = as1.object_type(obj_as1)
# AS1 verb => human-readable phrase
phrases = {
'accept': 'accepted',
'article': 'posted',
'comment': 'replied',
'delete': 'deleted',
'follow': 'followed',
'invite': 'is invited to',
'issue': 'filed issue',
'like': 'liked',
'note': 'posted',
'post': 'posted',
'repost': 'reposted',
'rsvp-interested': 'is interested in',
'rsvp-maybe': 'might attend',
'rsvp-no': 'is not attending',
'rsvp-yes': 'is attending',
'share': 'reposted',
'stop-following': 'unfollowed',
'undo': 'undid',
'update': 'updated',
}
phrases.update({type: 'profile refreshed:' for type in as1.ACTOR_TYPES})
obj.phrase = phrases.get(type, '')
content = (obj_as1.get('content')
or obj_as1.get('displayName')
or obj_as1.get('summary'))
if content:
content = util.parse_html(content).get_text()
urls = as1.object_urls(obj_as1)
url = urls[0] if urls else None
if url and not content:
# heuristics for sniffing URLs and converting them to more friendly
# phrases and user handles.
# TODO: standardize this into granary.as2 somewhere?
from activitypub import FEDI_URL_RE
from atproto import COLLECTION_TO_TYPE, did_to_handle
handle = suffix = ''
if match := FEDI_URL_RE.match(url):
handle = match.group('handle')
if match.group('post_id'):
suffix = "'s post"
elif match := BSKY_APP_URL_RE.match(url):
handle = match.group('id')
if match.group('tid'):
suffix = "'s post"
elif match := AT_URI_RE.match(url):
handle = match.group('repo')
if coll := match.group('collection'):
suffix = f"'s {COLLECTION_TO_TYPE.get(coll) or 'post'}"
url = bluesky.at_uri_to_web_url(url)
elif url.startswith('did:'):
handle = url
url = bluesky.Bluesky.user_url(handle)
if handle:
if handle.startswith('did:'):
handle = did_to_handle(handle) or handle
content = f'@{handle}{suffix}'
if url:
content = common.pretty_link(url, text=content, user=user)
obj.content = (obj_as1.get('content')
or obj_as1.get('displayName')
or obj_as1.get('summary'))
obj.url = as1.get_url(obj_as1)
if type in ('like', 'follow', 'repost', 'share') or not obj.content:
inner_as1 = as1.get_object(obj_as1)
obj.inner_url = as1.get_url(inner_as1) or inner_as1.get('id')
if obj.url:
obj.phrase = common.pretty_link(
obj.url, text=obj.phrase, attrs={'class': 'u-url'}, user=user)
if content:
obj.content = content
obj.url = url
elif obj.inner_url:
obj.content = common.pretty_link(obj.inner_url, max_length=50)
return objects, new_before, new_after
[docs]
def hydrate(activity, fields=('author', 'actor', 'object')):
"""Hydrates fields in an AS1 activity, in place.
Args:
activity (dict): AS1 activity
fields (sequence of str): names of fields to hydrate. If they're string ids,
loads them from the datastore, if possible, and replaces them with their dict
AS1 objects.
Returns:
sequence of :class:`google.cloud.ndb.tasklets.Future`: tasklets for hydrating
each field. Wait on these before using ``activity``.
"""
def _hydrate(field):
def maybe_set(future):
if future.result() and future.result().as1:
activity[field] = future.result().as1
return maybe_set
futures = []
for field in fields:
val = as1.get_object(activity, field)
if val and val.keys() <= set(['id']):
# TODO: extract a Protocol class method out of User.profile_id,
# then use that here instead. the catch is that we'd need to
# determine Protocol for every id, which is expensive.
#
# same TODO is in models.fetch_objects
id = val['id']
if id.startswith('did:'):
id = f'at://{id}/app.bsky.actor.profile/self'
future = Object.get_by_id_async(id)
future.add_done_callback(_hydrate(field))
futures.append(future)
return futures
[docs]
def fetch_page(query, model_class, by=None, max_age=None):
"""Fetches a page of results from a datastore query.
Uses the ``before`` and ``after`` query params (if provided; should be
ISO8601 timestamps) and the ``by`` property to identify the page to fetch.
Populates a ``log_url_path`` property on each result entity that points to a
its most recent logged request.
Args:
query (google.cloud.ndb.query.Query)
model_class (class)
by (ndb.model.Property): paging property, eg :attr:`Object.updated`
or :attr:`Object.created`
max_age (datetime.timedelta): if provided, reject ``before``/``after``
params older than this, and don't generate paging links past it
Returns:
(list of Object or Follower, str, str) tuple: (results, new_before,
new_after), where new_before and new_after are query param values for
``before`` and ``after`` to fetch the previous and next pages,
respectively
"""
assert by
now = util.now().replace(tzinfo=None)
# if there's a paging param ('before' or 'after'), update query with it
# TODO: unify this with Bridgy's user page
def get_paging_param(param):
val = request.values.get(param)
if val:
try:
dt = util.parse_iso8601(val.replace(' ', '+'))
except BaseException as e:
error(f"Couldn't parse {param}, {val!r} as ISO8601: {e}")
if dt.tzinfo:
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
if max_age and now - dt > max_age:
error(f'{param} is too old')
return dt
before = get_paging_param('before')
after = get_paging_param('after')
if before and after:
error("can't handle both before and after")
elif after:
query = query.filter(by >= after).order(by)
elif before:
query = query.filter(by < before).order(-by)
else:
query = query.order(-by)
query_iter = query.iter()
results = sorted(itertools.islice(query_iter, 0, PAGE_SIZE),
key=lambda r: r.updated, reverse=True)
# calculate new paging param(s)
has_next = results and query_iter.probably_has_next()
new_after = (
before if before
else results[0].updated if has_next and after
else None)
if new_after:
new_after = new_after.isoformat()
new_before = (
after if after else
results[-1].updated if has_next
else None)
if new_before and max_age and now - new_before > max_age:
# don't link to pages older than the max paging age
new_before = None
if new_before:
new_before = new_before.isoformat()
return results, new_before, new_after
[docs]
def load_user(handle_or_id, proto=None, create=False, allow_opt_out=False,
raise_=False):
"""Loads a user by handle or id.
Args:
handle_or_id (str): user handle or id
proto (Protocol subclass or None): protocol to use. If None, will try to
determine protocol via Protocol.for_id and Protocol.for_handle
create (bool): if True, use get_or_create; if False, use get_by_id
allow_opt_out (bool): whether to return a user if they're currently opted out
raise_ (bool): passed through to :meth:`User.reload_profile`. If False, and
:meth:`User.reload_profile` returns None when fetching the user's profile,
this method raises :class:`RuntimeError`
Returns:
User:
Raises:
RuntimeError: if no matching user was found
"""
import protocol
logger.debug(f'loading {handle_or_id}')
if not proto or proto is protocol.Protocol:
if not (proto := protocol.Protocol.for_id(handle_or_id)):
proto, id = protocol.Protocol.for_handle(handle_or_id)
if id:
handle_or_id = id
if not proto:
if handle_or_id.startswith('@'):
return load_user(handle_or_id.removeprefix('@'), create=create,
allow_opt_out=allow_opt_out)
raise RuntimeError(f"Couldn't determine network for {handle_or_id}")
if proto.owns_id(handle_or_id) is not False:
if proto.LABEL == 'web' and util.is_web(handle_or_id):
if not util.is_homepage(handle_or_id):
raise RuntimeError(f"{handle_or_id} isn't a web domain or homepage URL")
# TODO: handle user vs object ids here. this incorrectly assumes that it's
# a user id. https://github.com/snarfed/bridgy-fed/issues/2281
id = ids.normalize_user_id(id=handle_or_id, proto=proto)
user = (proto.get_or_create(id, allow_opt_out=allow_opt_out, raise_=raise_)
if create else proto.get_by_id(id, allow_opt_out=allow_opt_out))
if not user:
raise RuntimeError(f"Couldn't load {handle_or_id} on {proto.PHRASE}")
return user
logger.debug(f"doesn't look like a {proto.LABEL} user ID, trying as a handle")
if proto.owns_handle(handle_or_id) is False:
if handle_or_id.startswith('@'):
return load_user(handle_or_id.removeprefix('@'), create=create,
proto=proto, allow_opt_out=allow_opt_out)
raise RuntimeError(f"{handle_or_id} doesn't look like a user id or handle on {proto.PHRASE}")
for user in proto.query(ndb.OR(proto.handle == handle_or_id,
proto.handle_as_domain == handle_or_id)):
# some users may have an old handle stored and indexed, but they've changed
# their handle since then, so check again in memory
if user.handle == handle_or_id or user.handle_as_domain == handle_or_id:
if user.use_instead:
logger.debug(f'{user.key} use_instead => {user.use_instead}')
user = user.use_instead.get()
if (not user.status and user.enabled_protocols) or allow_opt_out:
return user
if create:
id = proto.handle_to_id(handle_or_id)
if not id:
raise RuntimeError(f"{handle_or_id} doesn't look like a handle on {proto.PHRASE}")
user = proto.get_or_create(id, allow_opt_out=allow_opt_out, raise_=raise_)
if user and user.obj and user.obj.as1:
return user
raise RuntimeError(f"Couldn't find bridged {proto.LABEL} account {handle_or_id}")
[docs]
def maybe_truncate_key_id(id):
"""Returns id, truncated to ``_MAX_KEYPART_BYTES`` bytes if it's longer."""
encoded = id.encode('utf-8')
if len(encoded) > _MAX_KEYPART_BYTES:
truncated = encoded[:_MAX_KEYPART_BYTES].decode('utf-8', errors='ignore')
logger.warning(f'Truncating id {id} to {_MAX_KEYPART_BYTES} bytes: {truncated}')
return truncated
return id