#!/usr/bin/env python3 import configparser import datetime from email.message import EmailMessage import fileinput import functools import getpass import json import logging import smtplib from time import sleep from string import Template from urllib.parse import urlsplit import requests SUBJECT = "Licence du dépôt '$repository' hébergé sur la forge Chapril" SUBJECT_TMPL = Template(SUBJECT) MAIL = """\ Bonjour, Je suis animateur bénévole du service forge.chapril.org. Sauf erreur de ma part, le dépôt '$repository' [1] ne spécifie pas clairement de licence et n'est donc pas conforme aux conditions générales d'utilisation du service [2]. Nous vous demandons de bien vouloir vous conformer aux conditions générales d'utilisation du service, à défaut ce dépôt sera supprimé dans 15 jours. Nous sommes heureux que nos services numériques libres, éthiques et loyaux puissent vous être utiles. [1] https://forge.chapril.org/$user/$repository [2] https://www.chapril.org/cgu.html#forgechaprilorg-cpu En vous souhaitant, au nom de toute l'équipe du Chapril, un agréable usage de nos services libres, éthique et loyaux. -- $animsys""" MAIL_TMPL = Template(MAIL) AGIR_URL = "https://agir.april.org" FORGE_ML = "forge-support@chapril.org" def get_db_cursor(): import psycopg2 conn = psycopg2.connect(**db_config_from_gitea(), target_session_attrs="read-only") conn.set_session(readonly=True) assert conn.readonly logging.debug("Connected to gitea database") return conn.cursor() def db_config_from_gitea(): config = configparser.ConfigParser() with open("/etc/gitea/gitea.ini", encoding="utf-8") as giteacfg: # skip the first lines without a section while True: pos = giteacfg.tell() line = giteacfg.readline() try: config.read_string(line) except configparser.MissingSectionHeaderError: pass if config.sections(): config.clear() break giteacfg.seek(pos) config.read_file(giteacfg) db_params = { "options": "-c default_transaction_read_only=on", } if ":" in config["database"]["host"]: db_host, db_port = config["database"]["host"].split(":") db_params["host"] = db_host db_params["port"] = db_port else: db_params["host"] = config["database"]["host"] for param, gitea_param in ( ("sslmode", "ssl_mode"), ("dbname", "name"), ("user", "user"), ("password", "passwd"), ): if gitea_param in config["database"]: db_params[param] = config["database"][gitea_param] if "schema" in config["database"] and config["database"]["schema"]: db_params["options"] += f' -c search_path={config["database"]["schema"]}' return db_params def fetch_mail_from_db(cur, user): query = "select id, email from public.user where name = %s;" cur.execute(query, (user,)) uid, mail = cur.fetchone() owners = [mail] # Organizations don't have an email address if not mail: logging.debug("'%s' is an organization", user) # Fetch organization id query = ( "select id from public.team where org_id = %s and lower_name = 'owners';" ) cur.execute(query, (uid,)) team_id = cur.fetchone()[0] query = ( "select public.user.email from team_user INNER JOIN public.user" " ON team_user.uid = public.user.id where team_user.team_id = %s;" ) cur.execute(query, (team_id,)) owners = [x[0] for x in cur.fetchall()] return owners def notify(smtp, repositories, agir_key, fetch_mail): headers = {"X-Redmine-API-Key": agir_key, "Content-Type": "application/json"} note = Template( json.dumps( { "issue": { "private_notes": True, "notes": f'🔲 "$repository":$url: @$user@ notified by mail ' f"({datetime.date.today()})", } } ) ) req = requests.get(f"{AGIR_URL}/users/current.json", headers=headers) req.raise_for_status() sender = req.json()["user"] if "firstname" in sender and "lastname" in sender: animsys = f"{sender['firstname']} {sender['lastname']} " f"({sender['login']})" else: animsys = sender["login"] for url in repositories: url = url.strip() if not url: continue if not "/" in url: raise ValueError(f"URL not recognized ({url})") user, repository = urlsplit(url).path.split("/")[1:3] assert user and repository, f"Can not fetch user and repository from {url}" user_emails = fetch_mail(user) assert user_emails, f"Unable to identify owners of {url}" logging.debug("Mail for '%s' is '%s'", user, " ".join(user_emails)) subject = SUBJECT_TMPL.substitute(repository=repository) message = MAIL_TMPL.substitute( user=user, repository=repository, animsys=animsys ) email = EmailMessage() email.set_content(message) email["Subject"] = subject email["From"] = FORGE_ML email["To"] = ", ".join(user_emails) email["CC"] = FORGE_ML smtp.send_message(email) logging.debug("Mail to '%s' (%s) sent for %s.", user, email["To"], url) issue_api = f"{AGIR_URL}/issues/5615.json" req = requests.put( issue_api, data=note.substitute(user=user, repository=repository, url=url), headers=headers, ) req.raise_for_status() logging.debug("Note added to %s for %s", issue_api, url) sleep(1) # throttle mail and web notifications! def main(): logging.basicConfig(level=logging.DEBUG) # In order to use another SMTP server (in such case, fetch_mail may need to # be re-implemented using the Gitea API) # # import ssl # server = input("type your smtp server: ") # login = input("type your smtp login: ") # password = getpass.getpass(prompt="type your password and press enter:") # context = ssl.create_default_context(cafile="/usr/local/share/ca-certificates/own.crt") # with smtplib.smtp_ssl(server, 465, context=context) as smtp: # smtp.login(login, password) agir_key = getpass.getpass(prompt=f"Type {AGIR_URL} API key:") repositories = fileinput.input() with smtplib.SMTP("127.0.0.1", 25) as smtp: with get_db_cursor() as cursor: fetch_mail = functools.partial(fetch_mail_from_db, cursor) notify(smtp, repositories, agir_key, fetch_mail) if __name__ == "__main__": main()