216 lines
6.6 KiB
Python
Executable File
216 lines
6.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import configparser
|
|
import datetime
|
|
from email.message import EmailMessage
|
|
import fileinput
|
|
import functools
|
|
import getpass
|
|
import json
|
|
import logging
|
|
import smtplib
|
|
from time import sleep
|
|
from string import Template
|
|
from urllib.parse import urlsplit
|
|
|
|
import requests
|
|
|
|
SUBJECT = "Licence du dépôt '$repository' hébergé sur la forge Chapril"
|
|
SUBJECT_TMPL = Template(SUBJECT)
|
|
|
|
MAIL = """\
|
|
Bonjour,
|
|
|
|
Je suis animateur bénévole du service forge.chapril.org.
|
|
|
|
Sauf erreur de ma part, le dépôt '$repository' [1]
|
|
ne spécifie pas clairement de licence et n'est donc pas conforme aux
|
|
conditions générales d'utilisation du service [2].
|
|
|
|
Nous vous demandons de bien vouloir vous conformer aux conditions
|
|
générales d'utilisation du service, à défaut ce dépôt sera supprimé dans
|
|
15 jours.
|
|
|
|
Nous sommes heureux que nos services numériques libres, éthiques et loyaux
|
|
puissent vous être utiles.
|
|
|
|
[1] https://forge.chapril.org/$user/$repository
|
|
[2] https://www.chapril.org/cgu.html#forgechaprilorg-cpu
|
|
|
|
En vous souhaitant, au nom de toute l'équipe du Chapril, un agréable
|
|
usage de nos services libres, éthique et loyaux.
|
|
|
|
--
|
|
$animsys"""
|
|
MAIL_TMPL = Template(MAIL)
|
|
|
|
AGIR_URL = "https://agir.april.org"
|
|
FORGE_ML = "forge-support@chapril.org"
|
|
|
|
|
|
def get_db_cursor():
|
|
import psycopg2
|
|
|
|
conn = psycopg2.connect(**db_config_from_gitea(), target_session_attrs="read-only")
|
|
conn.set_session(readonly=True)
|
|
assert conn.readonly
|
|
logging.debug("Connected to gitea database")
|
|
return conn.cursor()
|
|
|
|
|
|
def db_config_from_gitea():
|
|
config = configparser.ConfigParser()
|
|
with open("/etc/gitea/gitea.ini", encoding="utf-8") as giteacfg:
|
|
# skip the first lines without a section
|
|
while True:
|
|
pos = giteacfg.tell()
|
|
line = giteacfg.readline()
|
|
try:
|
|
config.read_string(line)
|
|
except configparser.MissingSectionHeaderError:
|
|
pass
|
|
|
|
if config.sections():
|
|
config.clear()
|
|
break
|
|
giteacfg.seek(pos)
|
|
config.read_file(giteacfg)
|
|
|
|
db_params = {
|
|
"options": "-c default_transaction_read_only=on",
|
|
}
|
|
|
|
if ":" in config["database"]["host"]:
|
|
db_host, db_port = config["database"]["host"].split(":")
|
|
db_params["host"] = db_host
|
|
db_params["port"] = db_port
|
|
else:
|
|
db_params["host"] = config["database"]["host"]
|
|
|
|
for param, gitea_param in (
|
|
("sslmode", "ssl_mode"),
|
|
("dbname", "name"),
|
|
("user", "user"),
|
|
("password", "passwd"),
|
|
):
|
|
if gitea_param in config["database"]:
|
|
db_params[param] = config["database"][gitea_param]
|
|
|
|
if "schema" in config["database"] and config["database"]["schema"]:
|
|
db_params["options"] += f' -c search_path={config["database"]["schema"]}'
|
|
|
|
return db_params
|
|
|
|
|
|
def fetch_mail_from_db(cur, user):
|
|
query = "select id, email from public.user where name = %s;"
|
|
cur.execute(query, (user,))
|
|
uid, mail = cur.fetchone()
|
|
owners = [mail]
|
|
|
|
# Organizations don't have an email address
|
|
if not mail:
|
|
logging.debug("'%s' is an organization", user)
|
|
# Fetch organization id
|
|
query = (
|
|
"select id from public.team where org_id = %s and lower_name = 'owners';"
|
|
)
|
|
cur.execute(query, (uid,))
|
|
team_id = cur.fetchone()[0]
|
|
|
|
query = (
|
|
"select public.user.email from team_user INNER JOIN public.user"
|
|
" ON team_user.uid = public.user.id where team_user.team_id = %s;"
|
|
)
|
|
cur.execute(query, (team_id,))
|
|
owners = [x[0] for x in cur.fetchall()]
|
|
|
|
return owners
|
|
|
|
|
|
def notify(smtp, repositories, agir_key, fetch_mail):
|
|
headers = {"X-Redmine-API-Key": agir_key, "Content-Type": "application/json"}
|
|
note = Template(
|
|
json.dumps(
|
|
{
|
|
"issue": {
|
|
"private_notes": True,
|
|
"notes": f'🔲 "$repository":$url: @$user@ notified by mail '
|
|
f"({datetime.date.today()})",
|
|
}
|
|
}
|
|
)
|
|
)
|
|
|
|
req = requests.get(f"{AGIR_URL}/users/current.json", headers=headers)
|
|
req.raise_for_status()
|
|
sender = req.json()["user"]
|
|
if "firstname" in sender and "lastname" in sender:
|
|
animsys = f"{sender['firstname']} {sender['lastname']} " f"({sender['login']})"
|
|
else:
|
|
animsys = sender["login"]
|
|
|
|
for url in repositories:
|
|
url = url.strip()
|
|
if not url:
|
|
continue
|
|
if not "/" in url:
|
|
raise ValueError(f"URL not recognized ({url})")
|
|
user, repository = urlsplit(url).path.split("/")[1:3]
|
|
assert user and repository, f"Can not fetch user and repository from {url}"
|
|
user_emails = fetch_mail(user)
|
|
assert user_emails, f"Unable to identify owners of {url}"
|
|
logging.debug("Mail for '%s' is '%s'", user, " ".join(user_emails))
|
|
subject = SUBJECT_TMPL.substitute(repository=repository)
|
|
message = MAIL_TMPL.substitute(
|
|
user=user, repository=repository, animsys=animsys
|
|
)
|
|
|
|
email = EmailMessage()
|
|
email.set_content(message)
|
|
email["Subject"] = subject
|
|
email["From"] = FORGE_ML
|
|
email["To"] = ", ".join(user_emails)
|
|
email["CC"] = FORGE_ML
|
|
|
|
smtp.send_message(email)
|
|
logging.debug("Mail to '%s' (%s) sent for %s.", user, email["To"], url)
|
|
|
|
issue_api = f"{AGIR_URL}/issues/5615.json"
|
|
req = requests.put(
|
|
issue_api,
|
|
data=note.substitute(user=user, repository=repository, url=url),
|
|
headers=headers,
|
|
)
|
|
req.raise_for_status()
|
|
logging.debug("Note added to %s for %s", issue_api, url)
|
|
|
|
sleep(1) # throttle mail and web notifications!
|
|
|
|
|
|
def main():
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
# In order to use another SMTP server (in such case, fetch_mail may need to
|
|
# be re-implemented using the Gitea API)
|
|
#
|
|
# import ssl
|
|
# server = input("type your smtp server: ")
|
|
# login = input("type your smtp login: ")
|
|
# password = getpass.getpass(prompt="type your password and press enter:")
|
|
# context = ssl.create_default_context(cafile="/usr/local/share/ca-certificates/own.crt")
|
|
# with smtplib.SMTP_SSL(server, 465, context=context) as smtp:
|
|
# smtp.login(login, password)
|
|
|
|
agir_key = getpass.getpass(prompt=f"Type {AGIR_URL} API key:")
|
|
|
|
repositories = fileinput.input()
|
|
with smtplib.SMTP("127.0.0.1", 25) as smtp:
|
|
with get_db_cursor() as cursor:
|
|
fetch_mail = functools.partial(fetch_mail_from_db, cursor)
|
|
notify(smtp, repositories, agir_key, fetch_mail)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|