"""UI pages."""
import copy
from datetime import timedelta
from functools import wraps
import html
import itertools
import logging
import re
import time
import arroba.server
from flask import request
from google.cloud import ndb
from google.cloud.ndb import tasklets
from google.cloud.ndb.key import Key
from google.cloud.ndb.query import OR
from google.cloud.ndb.model import get_multi, Model
from granary import as1, as2, atom, microformats2, rss
import jwt
import lexrpc
import oauth_dropins
from oauth_dropins.bluesky import BlueskyAuth
import requests
from webutil import flask_util, logs, util
from webutil.util import json_dumps
from webutil.flask_util import (
canonicalize_request_domain,
error,
flash,
get_flashed_messages,
get_required_param,
Found,
MovedPermanently,
)
import werkzeug.exceptions
from werkzeug.exceptions import NotFound
import activitypub
from activitypub import ActivityPub
import atproto
from atproto import ATProto, MAIN_PDS_DOMAINS
import common
from common import (
CACHE_CONTROL,
disable_if_read_only,
ErrorButDoNotRetryTask,
render_template,
secret_key_auth,
verify_jwt,
)
from domains import (
BLOG_REDIRECT_DOMAINS,
DOMAIN_RE,
DomainBlocklist,
KNOWN_DOMAIN_BLOCKLISTS,
PRIMARY_DOMAIN,
PROTOCOL_DOMAINS,
)
from farcaster import Farcaster
from flask_app import app
from flask import redirect
import ids
import memcache
import models
from models import (
fetch_objects,
fetch_page,
Follower,
Object,
PAGE_SIZE,
PROTOCOLS,
USER_STATUS_DESCRIPTIONS,
)
from nostr import Nostr
from protocol import Protocol
from web import Web
import webfinger
logger = logging.getLogger(__name__)
IFRAMELY_API_KEY_MD5 = util.read('iframely_api_key_md5')
TEMPLATE_VARS = {
'ActivityPub': ActivityPub,
'as1': as1,
'as2': as2,
'ATProto': ATProto,
'Farcaster': Farcaster,
'getattr': getattr,
'hasattr': hasattr,
'ids': ids,
'IFRAMELY_API_KEY_MD5': IFRAMELY_API_KEY_MD5,
'logs': logs,
'Nostr': Nostr,
'PROTOCOLS': PROTOCOLS,
'Web': Web,
}
# how far back the before/after paging links and params are allowed to go
# currently unused
PAGING_MAX_AGE = timedelta(days=30)
# precompute this because we get a ton of requests for non-existing users
# from weird open redirect referrers:
# https://github.com/snarfed/bridgy-fed/issues/422
with app.test_request_context('/'):
USER_NOT_FOUND_HTML = render_template('user_not_found.html', **TEMPLATE_VARS)
[docs]
def load_user(proto, id):
"""Loads and returns the current request's user.
Args:
protocol (str):
id (str):
Returns:
models.User:
Raises:
:class:`werkzeug.exceptions.HTTPException` on error or redirect
"""
assert id
if id in PROTOCOL_DOMAINS:
error(f'{proto} user {id} not found', status=404)
try:
user = models.load_user(id, proto=PROTOCOLS[proto], create=False)
except (AttributeError, RuntimeError, ValueError) as err:
error(str(err), status=404)
if user.enabled_protocols or user.DEFAULT_SERVE_USER_PAGES:
assert not user.use_instead
return user
# TODO: switch back to USER_NOT_FOUND_HTML
# not easy via exception/abort because this uses Werkzeug's built in
# NotFound exception subclass, and we'd need to make it implement
# get_body to return arbitrary HTML.
error(f'{proto} user {id} not found', status=404)
[docs]
def no_paging(fn):
"""Returns 400 if the request has ``before`` or ``after`` params."""
@wraps(fn)
def wrapper(*args, **kwargs):
if request.values.get('before') or request.values.get('after'):
error('Paging is disabled', status=400)
return fn(*args, **kwargs)
return wrapper
[docs]
def require_login(fn):
"""Decorator that requires and loads the current request's logged in user.
Passes the user in the ``user`` kwarg, as a :class:`models.User`.
HTTP POST params:
key (str): url-safe ndb key
Raises:
:class:`werkzeug.exceptions.HTTPException` on error or redirect
"""
@wraps(fn)
def wrapper(*args, **kwargs):
key = Key(urlsafe=get_required_param('key'))
if key not in [login_to_user_key(l) for l in get_logins()]:
logger.warning(f'not logged in for {key}')
raise Found(location='/login')
elif not (user := key.get()):
raise Found(location='/login')
return fn(*args, user=user, **kwargs)
return wrapper
[docs]
def require_token(scope, claims=None):
"""Decorator that loads a user and checks that they're authorized by token.
Expects a ``user_id`` kwarg. Loads the matching user and replaces it with a
``user`` kwarg passed to the handler with the loaded :class:`User`.
The per-user JWT should be in the ``token`` query param or form arg.
Must be used *below* :meth:`flask.Flask.route`, eg:
@app.route('/path')
@require_token('respond')
def handler():
...
Args:
scope (str): expected scope that the JWT must match
claims (sequence of str): optional additional claims that the JWT must match
"""
def decorator(fn):
@wraps(fn)
def decorated(*args, protocol=None, user_id=None, **kwargs):
assert protocol
assert user_id
user = load_user(protocol, user_id)
expected_claims = ({c: flask_util.get_required_param(c) for c in claims}
if claims else {})
try:
verify_jwt(flask_util.get_required_param('token'),
user_id=user.key.id(), scope=scope, **expected_claims)
except jwt.InvalidTokenError as err:
logger.error(err)
error('Bad token', status=401)
except ValueError as err:
logger.error(err)
error('Not authorized', status=403)
return fn(*args, user=user, **kwargs)
return decorated
return decorator
[docs]
def get_logins():
"""Returns the user's current logged in sessions:
Returns:
list of :class:`oauth_dropins.models.BaseAuth`
"""
logins = [l for l in get_multi(oauth_dropins.get_logins()) if l]
return sorted(logins, key=lambda l: (l.key.kind(), l.user_display_name()))
[docs]
def login_to_user_key(login):
""""Converts an oauth-dropins auth entity to a :model:`User` key.
Args:
login (oauth_dropins.models.BaseAuth)
Returns:
ndb.key.Key:
"""
match login.site_name():
case 'Bluesky':
return ATProto(id=login.key.id()).key
case 'Mastodon':
if id := login.actor_id():
return ActivityPub(id=id).key
logger.warning(f'Mastodon auth entity {login.key.id()} has no user_json or uri')
return None
case 'Pixelfed':
user, server = login.key.id().strip('@').split('@')
return ActivityPub(id=f'https://{server}/users/{user}').key
case 'Threads':
username = login.user_display_name()
handle = f'@{username}@threads.net'
if user := ActivityPub.query(ActivityPub.handle == handle).get():
return user.key
if not (actor_id := webfinger.fetch_actor_url(handle)):
for msg in get_flashed_messages:
if 'HTTP 404' in msg:
flash('You need to <a href="https://help.instagram.com/169559812696339">turn on fediverse sharing</a> first.', escape=False)
return None
return ActivityPub(id=actor_id).key
case 'IndieAuth':
return Web(id=util.domain_from_link(login.key.id())).key
case _:
assert False, repr(login)
[docs]
def render(template, **vars):
"""Renders a Jinja2 template and adds our standard template variables.
Args:
template (str): file name
"""
if user := vars.get('user'):
vars['bridged_protos'] = [
proto for proto in set(PROTOCOLS.values())
if proto and not isinstance(user, proto)
and proto.LABEL not in ('ui', 'web')
and user.is_enabled(proto)]
vars = {**TEMPLATE_VARS, **vars}
return render_template(template, logins=get_logins(), **vars)
[docs]
@app.route('/')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@flask_util.headers(CACHE_CONTROL)
def front_page():
"""View for the front page."""
return render('index.html')
[docs]
@app.route('/docs')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@flask_util.headers(CACHE_CONTROL)
def docs():
"""View for the docs page."""
return render('docs.html')
[docs]
@app.route('/login')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@flask_util.headers(CACHE_CONTROL)
@disable_if_read_only
def login():
"""View for the front page."""
return render('login.html')
[docs]
@app.post('/logout')
def logout():
"""Logs the user out of all current login sessions."""
oauth_dropins.logout()
flash(f"OK, you're now logged out.")
return redirect('/')
[docs]
@app.route('/settings')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@disable_if_read_only
def settings():
"""User settings page. Requires logged in session."""
auth_entity = request.args.get('auth_entity')
logged_in_as = Key(urlsafe=auth_entity) if auth_entity else None
def site_logo(login):
return f'/oauth_dropins_static/{login.site_name().lower()}_icon.png'
users = []
logins_and_user_keys = []
for login in get_logins():
if user_key := login_to_user_key(login):
if login.key == logged_in_as:
cls = Model._lookup_model(user_key.kind())
user = cls.get_or_create(id=user_key.id(), allow_opt_out=True,
reload=True)
user.logo = site_logo(login)
users.append(user)
else:
logins_and_user_keys.append((login, user_key))
loaded = get_multi(key for _, key in logins_and_user_keys)
for (login, _), user in zip(logins_and_user_keys, loaded):
if user:
user.logo = site_logo(login)
user.domain_blocklists = []
for obj in ndb.get_multi(user.blocks[:PAGE_SIZE]):
if getattr(obj, 'is_csv', None):
url = obj.key.id()
user.domain_blocklists.append(
KNOWN_DOMAIN_BLOCKLISTS.get(url)
or DomainBlocklist(name=url.split('/')[-1],
about_url=url, csv_url=url))
users.append(user)
if not users:
return redirect('/login')
return render(
'settings.html',
**locals(),
KNOWN_DOMAIN_BLOCKLISTS=KNOWN_DOMAIN_BLOCKLISTS,
USER_STATUS_DESCRIPTIONS=USER_STATUS_DESCRIPTIONS,
)
[docs]
@app.post('/settings/enable')
@disable_if_read_only
@require_login
def enable(user=None):
"""Enables bridging for a given account.
Args:
user (models.User)
"""
enabled = []
for proto in set(PROTOCOLS.values()):
if (proto and not isinstance(user, proto)
# TODO: nostr, farcaster
and proto.LABEL not in ('ui', 'web', 'nostr', 'farcaster')
and not user.is_enabled(proto)):
try:
user.enable_protocol(proto)
except ErrorButDoNotRetryTask as e:
msg = str(e)
if resp := e.get_response():
if resp.is_json:
msg = resp.json['error']
flash(f"Couldn't enable bridging to {proto.PHRASE}: {msg}")
return redirect('/settings')
proto.bot_maybe_follow_back(user)
enabled.append(proto)
if enabled:
flash(f'Now bridging {user.handle_or_id()} to {",".join(p.PHRASE for p in enabled)}.')
else:
flash(f'{user.handle_or_id()} is already bridging.')
return redirect('/settings')
[docs]
@app.post('/settings/disable')
@disable_if_read_only
@require_login
def disable(user=None):
"""Disables bridging for a given account.
Args:
user (models.User)
"""
if not user.enabled_protocols:
flash(f'{user.handle_or_id()} is not currently bridging.')
return redirect('/settings')
enabled = list(user.enabled_protocols)
for proto in user.enabled_protocols:
user.delete(PROTOCOLS[proto])
user.disable_protocol(PROTOCOLS[proto])
flash(f'Disabled bridging {user.handle_or_id()} to {",".join(PROTOCOLS[p].PHRASE for p in enabled)}.')
return redirect('/settings')
[docs]
@app.post('/settings/set-username')
@disable_if_read_only
@require_login
def set_username(user=None):
"""Enables bridging for a given account.
Args:
user (models.User)
Query params:
protocol (str)
username (str)
"""
proto = PROTOCOLS[flask_util.get_required_param('protocol')]
username = flask_util.get_required_param('username')
try:
proto.set_username(user, username)
flash(f"Setting username on {proto.PHRASE} to {username}...")
except NotImplementedError:
flash(f"Custom usernames aren't supported on {proto.PHRASE}.")
except (ValueError, RuntimeError) as e:
flash(f"Couldn't set username on {proto.PHRASE} to {username}: {e}",
escape=False)
return redirect('/settings')
[docs]
@app.post('/settings/block')
@disable_if_read_only
@require_login
def block(user=None):
"""Blocks a user or blocklist.
TODO: unify with :func:`unblock`?
Args:
user (models.User): current logged in user, provided by :func:`require_login`
Query params:
target (str)
"""
target = flask_util.get_required_param('target')
links = []
for arg in target.strip().split():
try:
result = Protocol.block(user, arg)
links.append(result.html_link())
except ValueError as err:
flash(str(err))
if links:
flash(f"""OK, you're now blocking {', '.join(links)}.""", escape=False)
return redirect('/settings')
[docs]
@app.post('/settings/unblock')
@disable_if_read_only
@require_login
def unblock(user=None):
"""Unblocks a user or blocklist.
TODO: unify with :func:`block`?
Args:
user (models.User): current logged in user, provided by :func:`require_login`
Query params:
target (str)
"""
target = flask_util.get_required_param('target')
links = []
for arg in target.strip().split():
try:
result = Protocol.unblock(user, arg)
links.append(result.html_link())
except ValueError as err:
flash(str(err))
if links:
flash(f"""OK, you're not blocking {', '.join(links)}.""", escape=False)
return redirect('/settings')
[docs]
@app.post('/settings/toggle-notifs')
@disable_if_read_only
@require_login
def toggle_notifs(user=None):
"""Toggles DM notifications for a given account.
Args:
user (models.User)
"""
if user.send_notifs == 'all':
user.send_notifs = 'none'
verb = 'disabled'
else:
user.send_notifs = 'all'
verb = 'enabled'
user.put()
flash(f'DM notifications {verb} for {user.handle_or_id()}.')
return redirect('/settings')
[docs]
@app.post('/settings/migrate-to-activitypub')
@disable_if_read_only
@require_login
def migrate_to_activitypub(user=None):
"""Migrates a bridged account out to a native fediverse account.
Duplicates :func:`dms.migrate_to_activitypub` and Bounce's `confirm` and
`migrate_out`. Keep them in sync!
Args:
user (models.User)
Form params:
handle (str): the destination fediverse handle or id
"""
handle = flask_util.get_required_param('handle').strip()
try:
to_user = models.load_user(handle, ActivityPub, create=True,
allow_opt_out=True, raise_=True)
except (AttributeError, RuntimeError, ValueError) as err:
flash(str(err))
return redirect('/settings')
try:
ActivityPub.check_can_migrate_out(user, to_user.key.id())
except activitypub.NeedsAlias:
return render('set_alsoKnownAs.html', user=user, to_user=to_user,
handle=handle)
try:
ActivityPub.migrate_out(user, to_user.key.id())
except ValueError as e:
flash(str(e))
return redirect('/settings')
common.create_task(queue='migrate-out', user=user.key.urlsafe(),
protocol=ActivityPub.LABEL)
flash(f"OK, we'll migrate your bridged account on {ActivityPub.PHRASE} to {to_user.html_link()}.", escape=False)
return redirect('/settings')
[docs]
@app.post('/settings/migrate-to-atproto')
@disable_if_read_only
@require_login
def migrate_to_atproto(user=None):
"""First step of migrating a bridged account out to a new ATProto PDS.
Calls ``describeServer`` on the new PDS, then shows the create account form.
Duplicates :func:`dms.migrate_to_atproto` and Bounce's `confirm` and
`migrate_out`. Keep them in sync!
Args:
user (models.User)
Form params:
pds (str): the new PDS's URL
"""
pds = flask_util.get_required_param('pds').strip()
if not util.is_web(pds):
flash(f"{pds} doesn't look like a PDS domain or URL.")
return redirect('/settings')
pds_domain = util.domain_from_link(pds)
if util.domain_or_parent_in(pds_domain, MAIN_PDS_DOMAINS):
flash(f"Sorry, we can't migrate to {pds_domain}; it <a href='https://docs.bsky.app/blog/incoming-migration'>only allows accounts that it originally created</a>.",
escape=False)
return redirect('/settings')
client = lexrpc.Client(pds, requests_session=util.session)
try:
desc = client.com.atproto.server.describeServer()
except Exception as e:
_, body = util.interpret_http_exception(e)
flash(f"Couldn't connect to {pds}: {body or e}")
return redirect('/settings')
user_domains = desc.get('availableUserDomains')
handle_domain = user_domains[0] if user_domains else ''
if handle_domain and not handle_domain.startswith('.'):
handle_domain = '.' + handle_domain
show_invite_code = desc.get('inviteCodeRequired')
if desc.get('phoneVerificationRequired'):
return render('bluesky_phone_verification.html', user=user, pds=pds,
handle_domain=handle_domain, show_invite_code=show_invite_code)
return render('bluesky_create_account.html', user=user, pds=pds,
handle_domain=handle_domain, show_invite_code=show_invite_code,
show_phone_verification_code=False)
[docs]
@app.post('/settings/migrate-to-atproto/phone-verification')
@disable_if_read_only
@require_login
def migrate_to_atproto_phone_verification(user=None):
"""Requests a phone verification SMS from the new ATProto PDS.
Duplicates :func:`dms.migrate_to_atproto` and Bounce's
`bluesky_phone_verification_post`. Keep them in sync!
Args:
user (models.User)
Form params:
pds (str): the new PDS's URL
phone_number (str)
"""
pds = flask_util.get_required_param('pds')
phone_number = flask_util.get_required_param('phone_number')
handle_domain = request.values.get('handle_domain', '')
show_invite_code = bool(request.values.get('show_invite_code'))
client = lexrpc.Client(pds, requests_session=util.session)
try:
client.com.atproto.temp.requestPhoneVerification({'phoneNumber': phone_number})
except Exception as e:
_, body = util.interpret_http_exception(e)
flash(f'Error requesting phone verification: {body or e}')
return render('bluesky_phone_verification.html', user=user, pds=pds,
phone_number=phone_number, handle_domain=handle_domain,
show_invite_code=show_invite_code)
return render('bluesky_create_account.html', user=user, pds=pds,
handle_domain=handle_domain, show_invite_code=show_invite_code,
show_phone_verification_code=True)
[docs]
@app.post('/settings/migrate-to-atproto/create-account')
@disable_if_read_only
@require_login
def migrate_to_atproto_create_account(user=None):
"""Final step of migrating a bridged account out to a new ATProto PDS.
Creates the account on the new PDS, then migrates the bridged account out.
Args:
user (models.User)
Form params:
pds (str): the new PDS's URL
email (str)
password (str)
handle (str): optional, the handle's first segment, eg ``alice``
handle_domain (str): optional, the new PDS's handle domain, eg ``.pds.com``
invite_code (str): optional
phone_verification_code (str): optional
"""
pds = flask_util.get_required_param('pds')
pds_domain = util.domain_from_link(pds)
email = flask_util.get_required_param('email')
password = flask_util.get_required_param('password')
handle = request.values.get('handle')
handle_domain = request.values.get('handle_domain', '')
invite_code = request.values.get('invite_code')
phone_verification_code = request.values.get('phone_verification_code')
def create_account_form():
return render('bluesky_create_account.html', user=user, pds=pds,
email=email, handle=handle, handle_domain=handle_domain,
invite_code=invite_code, show_invite_code=bool(invite_code),
show_phone_verification_code=bool(phone_verification_code))
try:
resp = ATProto.create_account_for_migrate_out(
user, pds=pds, email=email, password=password,
handle=handle + handle_domain if handle else None,
invite_code=invite_code, phone_verification_code=phone_verification_code)
except requests.HTTPError as e:
msg = str(e)
if (e.response is not None
and common.content_type(e.response) == 'application/json'):
body = e.response.json()
msg = body.get('message') or body.get('error') or msg
flash(f'Error from {pds_domain}: {msg}')
return create_account_form()
try:
ATProto.migrate_out(user, resp['did'], to_pds=pds,
access_token=resp['accessJwt'],
refresh_token=resp['refreshJwt'],
handle=resp.get('handle'))
except ValueError as e:
flash(str(e))
return create_account_form()
# this shouldn't overwrite an existing BlueskyAuth because this account is
# currently on our PDS. (or if there is an existing BlueskyAuth, it's old,
# from a previous migration in, and obsolete.)
auth = BlueskyAuth(id=resp['did'], pds_url=pds, user_json=json_dumps(resp),
session=resp)
auth.put()
common.create_task(queue='migrate-out', user=user.key.urlsafe(),
auth=auth.key.urlsafe(), protocol=ATProto.LABEL)
flash(f"OK, we've migrated your bridged Bluesky account to <code>{resp['handle']}</code> on {pds_domain}.", escape=False)
return redirect('/settings')
@app.get(f'/<any({",".join(PROTOCOLS)}):protocol>/<id>')
# WARNING: this overrides the /ap/... actor URL route in activitypub.py, *only*
# for handles with leading @ character. be careful when changing this route!
@app.get(f'/ap/@<id>', defaults={'protocol': 'ap'})
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@no_paging
def profile(protocol, id):
if protocol == 'ap':
id = '@' + id
user = load_user(protocol, id)
query = Object.query(Object.users == user.key)
objects, before, after = fetch_objects(query, by=Object.updated, user=user)
num_followers, num_following = user.count_followers()
return render('profile.html', **locals())
@app.get(f'/<any({",".join(PROTOCOLS)}):protocol>/<id>/home')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@no_paging
def home(protocol, id):
user = load_user(protocol, id)
query = Object.query(Object.feed == user.key)
objects, before, after = fetch_objects(query, by=Object.created, user=user)
# this calls Object.actor_link serially for each object, which loads the
# actor from the datastore if necessary. TODO: parallelize those fetches
return render('home.html', **locals())
@app.get(f'/<any({",".join(PROTOCOLS)}):protocol>/<id>/notifications')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@no_paging
def notifications(protocol, id):
user = load_user(protocol, id)
query = Object.query(Object.notify == user.key)
objects, before, after = fetch_objects(query, by=Object.updated, user=user)
format = request.args.get('format')
if format:
return serve_feed(objects=objects, format=format, as_snippets=True,
user=user, title=f'Bridgy Fed notifications for {id}',
quiet=request.args.get('quiet'))
# notifications tab UI page
return render('notifications.html', **locals())
@app.get(f'/user-page')
@flask_util.headers(CACHE_CONTROL)
def find_user_page_form():
return render('find_user_page.html')
@app.post(f'/user-page')
def find_user_page():
id = request.form['id']
proto = Protocol.for_id(id)
resolved_id = None
if not proto:
proto, resolved_id = Protocol.for_handle(id)
if not proto:
flash(f"Couldn't determine network for {id}.")
return render('find_user_page.html'), 404
try:
user = load_user(proto.LABEL, resolved_id or id)
except NotFound:
flash(f"User {id} on {proto.PHRASE} isn't signed up.")
return render('find_user_page.html'), 404
return redirect(user.user_page_path())
@app.post(f'/<any({",".join(PROTOCOLS)}):protocol>/<id>/update-profile')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@disable_if_read_only
def update_profile(protocol, id):
user = load_user(protocol, id)
link = f'<a href="{user.web_url()}">{user.handle_or_id()}</a>'
try:
user.reload_profile(raise_=True)
except (RuntimeError, requests.RequestException,
werkzeug.exceptions.HTTPException) as e:
_, msg = util.interpret_http_exception(e)
flash(f"Couldn't update profile for {link}: {msg}", escape=False)
return redirect(user.user_page_path())
if not user.obj:
flash(f"Couldn't update profile for {link}", escape=False)
return redirect(user.user_page_path())
# enqueue an update, not the bare profile object, so that it gets
# a unique id and doesn't get de-duped
update = user.handle_bare_object(user.obj, authed_as=user.key.id(), from_user=user)
common.create_task(queue='receive', authed_as=user.key.id(), **update.to_request())
flash(f'Updating profile from {link}...', escape=False)
if user.LABEL == 'web':
if user.status:
logger.info(f'Disabling web user {user.key.id()}')
user.delete()
else:
for label in list(user.DEFAULT_ENABLED_PROTOCOLS) + user.enabled_protocols:
try:
PROTOCOLS[label].set_username(user, id)
except (AssertionError, ValueError, RuntimeError, NotImplementedError):
pass
return redirect(user.user_page_path())
@app.get(f'/<any({",".join(PROTOCOLS)}):protocol>/<id>/<any(followers,following):collection>')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@memcache.memoize(expire=timedelta(hours=1),
key=lambda *args, **kwargs: (args, kwargs, request.args.to_dict()))
@flask_util.headers(CACHE_CONTROL)
@no_paging
def followers_or_following(protocol, id, collection):
user = load_user(protocol, id)
id = user.key.id()
handle = user.handle
followers, before, after = Follower.fetch_page(collection, user)
num_followers, num_following = user.count_followers()
# followers on protocols where we're not currently bridged shouldn't count.
# ideally we'd remove all of them from the count, but we don't currently have a
# good (efficient) way to include that in the query in count_followers(), so for
# now, just revise the follower count down for the ones we see in the page that
# we've fetched and will display.
#
# https://github.com/snarfed/bridgy-fed/issues/1966#issuecomment-2985666899
if num_followers <= PAGE_SIZE:
num_followers = min(num_followers, len(followers))
return render(
f'{collection}.html',
address=request.args.get('address'),
follow_url=request.values.get('url'),
**locals(),
)
[docs]
@app.get(f'/<any({",".join(PROTOCOLS)}):protocol>/<id>/feed')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@memcache.memoize(expire=timedelta(hours=1),
key=lambda **kwargs: (kwargs, request.args.to_dict()))
@flask_util.headers(CACHE_CONTROL)
@no_paging
def feed(protocol, id):
"""
Query params:
format: html, atom, or rss
"""
user = load_user(protocol, id)
query = Object.query(Object.feed == user.key)
objects, _, _ = fetch_objects(query, by=Object.created, user=user)
return serve_feed(objects=objects, format=request.args.get('format', 'html'),
user=user, title=f'Bridgy Fed feed for {id}')
[docs]
def serve_feed(*, objects, format, user, title, as_snippets=False, quiet=False):
"""Generates a feed based on :class:`Object` s.
Args:
objects (sequence of models.Object)
format (str): ``html``, ``atom``, or ``rss``
user (models.User)
title (str)
as_snippets (bool): if True, render short snippets for objects instead of
full contents
quiet (bool): if True, exclude follows, unfollows, likes, and reposts
Returns:
str or (str, dict) tuple: Flask response
"""
if format not in ('html', 'atom', 'rss'):
error(f'format {format} not supported; expected html, atom, or rss')
objects = [obj for obj in objects if not obj.deleted]
if quiet:
objects = [obj for obj in objects if obj.type not in
('delete', 'follow', 'stop-following', 'like', 'person', 'share',
'undo', 'update')]
if as_snippets:
activities = [{
'objectType': 'note',
'id': obj.key.id(),
'content': f'{obj.actor_link(image=False, user=user)} {obj.phrase} {obj.content}',
'updated': obj.updated.isoformat(),
'url': as1.get_url(obj.as1) or as1.get_url(as1.get_object(obj.as1)),
} for obj in objects]
else:
activities = [obj.as1 for obj in objects]
# hydrate authors, actors, objects from stored Objects
futures = []
for a in activities:
futures.extend(models.hydrate(a))
tasklets.wait_all(futures)
actor = (user.obj.as1 if user.obj and user.obj.as1
else {'displayName': user.handle, 'url': user.web_url()})
# TODO: inject/merge common.pretty_link into microformats2.render_content
# (specifically into hcard_to_html) somehow to convert Mastodon URLs to @-@
# syntax. maybe a fediverse kwarg down through the call chain?
if format == 'html':
entries = [microformats2.object_to_html(a) for a in activities]
return render('feed.html', **locals())
elif format == 'atom':
body = atom.activities_to_atom(activities, actor=actor, title=title,
request_url=request.url)
return body, {'Content-Type': atom.CONTENT_TYPE}
elif format == 'rss':
# RSS requires email to generate an author element, so fill in blank one
# where necessary
for a in activities:
for field in ('actor', 'author', 'object'):
if val := as1.get_object(a, field):
if as1.object_type(val) in as1.ACTOR_TYPES:
val.setdefault('email', '_@_._')
body = rss.from_activities(activities, actor=actor, title=title,
feed_url=request.url)
return body, {'Content-Type': rss.CONTENT_TYPE}
[docs]
@app.get(f'/<any({",".join(PROTOCOLS)}):protocol>/<user_id>/respond')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@require_token('respond', ['obj_id'])
def respond(user):
"""Lets a user reply to, like, or repost an unbridged post.
Query params:
obj_id (str): Object id
token (str): JWT token for user authentication
"""
if not (obj := Object.get_by_id(get_required_param('obj_id'))):
error('Object not found', status=404)
obj_as1 = copy.deepcopy(obj.as1)
as1.convert_html_content_to_text(obj_as1)
return render('respond.html', user=user, obj=obj,
obj_html=microformats2.object_to_html(obj_as1),
token=get_required_param('token'))
[docs]
@app.post(f'/<any({",".join(PROTOCOLS)}):protocol>/<user_id>/respond/reply')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@require_token('respond', ['obj_id'])
def respond_reply(user):
"""Creates a reply activity.
Form params:
obj_id (str): Object id to reply to
content (str): reply text content
token (str): JWT token for user authentication
"""
obj_id = get_required_param('obj_id')
if not (obj := Object.get_by_id(obj_id)):
error('Object not found', status=404)
if not (content := request.values.get('content')):
flash('Please enter a reply')
return redirect(user.user_page_path(
f'respond?obj_id={obj_id}&token={request.values["token"]}'))
id = f'ui:reply-{user.key.id()}-{obj_id}-{util.now().isoformat()}'
our_as1 = {
'objectType': 'comment',
'id': id,
'inReplyTo': obj_id,
'content': content,
'author': user.key.id(),
}
common.create_task(queue='receive', id=id, our_as1=our_as1,
source_protocol='ui', authed_as=user.key.id())
flash('Sending reply...')
return render('respond.html', user=user, obj=obj,
token=get_required_param('token'),
IFRAMELY_API_KEY_MD5=IFRAMELY_API_KEY_MD5)
[docs]
@app.post(f'/<any({",".join(PROTOCOLS)}):protocol>/<user_id>/respond/like')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@require_token('respond', ['obj_id'])
def respond_like(user):
"""Creates a like activity.
Form params:
obj_id (str): Object id to like
token (str): JWT token for user authentication
"""
if not (obj := Object.get_by_id(get_required_param('obj_id'))):
error('Object not found', status=404)
id = f'ui:like-{user.key.id()}-{obj.key.id()}-{util.now().isoformat()}'
our_as1 = {
'objectType': 'activity',
'verb': 'like',
'id': id,
'object': obj.key.id(),
'actor': user.key.id(),
}
common.create_task(queue='receive', id=id, our_as1=our_as1,
source_protocol='ui', authed_as=user.key.id())
flash('Sending like...')
return render('respond.html', user=user, obj=obj,
token=get_required_param('token'),
IFRAMELY_API_KEY_MD5=IFRAMELY_API_KEY_MD5)
[docs]
@app.post(f'/<any({",".join(PROTOCOLS)}):protocol>/<user_id>/respond/repost')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@require_token('respond', ['obj_id'])
def respond_repost(user):
"""Creates a repost/share activity.
Form params:
obj_id (str): Object id to repost
token (str): JWT token for user authentication
"""
if not (obj := Object.get_by_id(get_required_param('obj_id'))):
error('Object not found', status=404)
id = f'ui:repost-{user.key.id()}-{obj.key.id()}-{util.now().isoformat()}'
our_as1 = {
'objectType': 'activity',
'verb': 'share',
'id': id,
'object': obj.key.id(),
'actor': user.key.id(),
}
common.create_task(queue='receive', id=id, our_as1=our_as1,
source_protocol='ui', authed_as=user.key.id())
flash('Sending repost...')
return render('respond.html', user=user, obj=obj,
token=get_required_param('token'),
IFRAMELY_API_KEY_MD5=IFRAMELY_API_KEY_MD5)
[docs]
@app.post(f'/<any({",".join(PROTOCOLS)}):protocol>/<user_id>/respond/block')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@require_token('respond', ['obj_id'])
def respond_block(user):
"""Creates a block activity.
Form params:
obj_id (str): Object id to get the actor from to block
token (str): JWT token for user authentication
"""
if not (obj := Object.get_by_id(get_required_param('obj_id'))):
error('Object not found', status=404)
# get the actor from the object
actor = as1.get_owner(obj.as1)
if not actor:
error('No actor found in object', status=400)
id = f'ui:block-{user.key.id()}-{obj.key.id()}-{util.now().isoformat()}'
our_as1 = {
'objectType': 'activity',
'verb': 'block',
'id': id,
'object': actor,
'actor': user.key.id(),
}
common.create_task(queue='receive', id=id, our_as1=our_as1,
source_protocol='ui', authed_as=user.key.id())
flash('Blocking...')
return render('respond.html', user=user, obj=obj,
token=get_required_param('token'),
IFRAMELY_API_KEY_MD5=IFRAMELY_API_KEY_MD5)
@app.get('/log')
@canonicalize_request_domain(PROTOCOL_DOMAINS, PRIMARY_DOMAIN)
@flask_util.headers(CACHE_CONTROL)
def log():
return logs.log()
@app.get(f'/internal/<any({",".join(BLOG_REDIRECT_DOMAINS)}):host>/<path:path>')
@flask_util.headers(CACHE_CONTROL)
def blog_redirect(host, path):
return MovedPermanently(location=f'https://{host}/{path}')