icingabot/icingabot.py

365 lines
12 KiB
Python
Executable File

#! /usr/bin/env python3
#
# Simple IRC Bot statusing icinga2 via its API.
#
# François Poulain <fpoulain@metrodore.fr>
#
#
# Based on example program using irc.bot.
# from
# Joel Rosdahl <joel@rosdahl.net>
# https://github.com/jaraco/irc/raw/master/scripts/testbot.py
import configparser
import json
import os.path
import re
import sys
from itertools import groupby
import requests
import irc.bot
import irc.strings
"""A simple IRC Bot statusing icinga2.
It is based on TestBot example bot that uses the SingleServerIRCBot class from
irc.bot. The bot enters a channel and listens for commands in private messages
and channel traffic. Commands in channel messages are given by prefixing the
text by the bot name followed by a colon. Periodically, the bot read Icinga2
status and report all new KO services.
Requirements: python3-irc python3-requests
The known commands are:
"""
commands = {
"ack": {"help": "Acknowledge all services matching the query."},
"recheck": {
"help": "Recheck all services matching the query.",
"synonyms": [r"refresh"],
},
"list": {"help": "List all KO services.", "synonyms": [r"lsit", r"lits"]},
"leave": {
"help": "Disconnect the bot."
" The bot will try to reconnect after 300 seconds.",
},
"mute": {
"help": "Mute the bot (no more status report)."
" The bot will unmute after 1 hour or after receiving any command.",
"synonyms": [
r"fuck",
r"chut",
r"couché",
r"sieste",
r"t(a|o)\s*g(ueu|o)le",
],
},
"die": {"help": "Let the bot cease to exist."},
"help": {"help": "Print help."},
}
# Load configuration.
configurationFilename = "/etc/icingabot/icingabot.conf"
if os.path.isfile(configurationFilename):
config = configparser.RawConfigParser()
config.read(configurationFilename)
settings = {
"ircsrv": config.get("irc", "irc.server"),
"ircchan": config.get("irc", "irc.chan"),
"ircport": config.getint("irc", "irc.port"),
"ircnick": config.get("irc", "irc.nick"),
"irccmd": config.get("irc", "irc.cmd_prefix"),
# see /etc/icinga2/conf.d/api-users.conf
"icinga2user": config.get("icinga2", "icinga2.user"),
"icinga2pass": config.get("icinga2", "icinga2.password"),
"icinga2ca": config.get("icinga2", "icinga2.ca"),
"icinga2fqdn": config.get("icinga2", "icinga2.fqdn"),
"icinga2port": config.getint("icinga2", "icinga2.port"),
}
else:
print("Missing configuration file [/etc/icingabot/icingabot.conf].")
sys.exit()
class Icinga2ServiceManager:
ko_services = []
def build_request_url(self, uri, params={}):
# Since icinga2 wants « URL-encoded strings » but requests
# seems to makes « application/x-www-form-urlencoded »,
# so we munge params and shortcut urlencode.
# See
# https://www.icinga.com/docs/icinga2/latest/doc/12-icinga2-api/#parameters
url = "https://{}:{}{}".format(
settings["icinga2fqdn"], settings["icinga2port"], uri
)
return (
requests.Request("GET", url, params=params)
.prepare()
.url.replace("+", "%20")
)
def fetch_ko_services(self):
headers = {
"Accept": "application/json",
"X-HTTP-Method-Override": "GET",
}
data = {
"attrs": ["last_check_result", "display_name", "host_name", "acknowledgement"],
"filter": "service.state!=ServiceOK",
}
try:
r = requests.post(
self.build_request_url("/v1/objects/services"),
headers=headers,
auth=(settings["icinga2user"], settings["icinga2pass"]),
data=json.dumps(data),
verify=settings["icinga2ca"],
)
if r.status_code == 200:
new_ko_services = [
n for n in r.json()["results"] if n is not None
]
news = [
n
for n in new_ko_services
if n["name"] not in [nn["name"] for nn in self.ko_services]
]
lost = [
n
for n in self.ko_services
if n["name"] not in [nn["name"] for nn in new_ko_services]
]
self.ko_services = new_ko_services
return (lost, news)
except Exception as e:
self.send("Unable to fetch from Icinga2: {}".format(e))
return (False, False)
def post_on_services(self, pattern, uri, data={}):
headers = {
"Accept": "application/json",
"X-HTTP-Method-Override": "POST",
}
params = {
"type": "Service",
"filter": "service.state!=ServiceOK"
}
if pattern:
params['filter'] += '&& match("*{}*", service.__name)'.format(pattern)
try:
r = requests.post(
self.build_request_url(uri, params=params),
headers=headers,
auth=(settings["icinga2user"], settings["icinga2pass"]),
data=json.dumps(data),
verify=settings["icinga2ca"],
)
for a in r.json()["results"]:
self.send(a["status"])
if pattern is not None and not r.json()["results"]:
self.send("No matching service for « {} »".format(pattern))
except Exception as e:
self.send("Unable to post to Icinga2: {}".format(e))
def ack_service(self, pattern, comment, nick):
data = {
"author": nick,
"comment": comment or " ", # never "" !
}
self.post_on_services(pattern, '/v1/actions/acknowledge-problem', data)
def recheck_service(self, pattern):
self.post_on_services(pattern, '/v1/actions/reschedule-check')
class IcingaBot(Icinga2ServiceManager, irc.bot.SingleServerIRCBot):
args = ""
muted = False
nick_suffix = ""
def __init__(self, channel, nickname, server, port=6667):
irc.bot.SingleServerIRCBot.__init__(
self,
[(server, port)],
nickname,
nickname,
reconnection_interval=300,
)
self.nick = nickname
self.channel = channel
self.connection.execute_every(30, self.refresh_ko_services)
self.refresh_ko_services()
def suffix_nick(self, suffix):
self.nick_suffix = suffix
if self.connection.is_connected():
self.connection.nick("{}{}".format(self.nick, suffix))
def unmute(self):
self.muted = False
if self.ko_services:
self.suffix_nick("[{}]".format(len(self.ko_services)))
else:
self.suffix_nick("")
def send(self, msg):
if not self.muted and self.connection.is_connected():
for line in msg.split("\n"):
self.connection.privmsg(self.channel, line)
def refresh_ko_services(self):
lost, news = self.fetch_ko_services()
if lost is False and news is False:
return
if self.ko_services:
self.suffix_nick("[{}]".format(len(self.ko_services)))
else:
self.suffix_nick("")
return
def on_nicknameinuse(self, c, e):
c.nick(+"_")
def on_welcome(self, c, e):
c.join(self.channel)
def on_privmsg(self, c, e):
self.do_command(e, e.arguments[0])
def on_pubmsg(self, c, e):
if e.arguments[0].startswith(settings["irccmd"]):
self.do_command(e, e.arguments[0][1:])
return
a = e.arguments[0].split(":", 1)
if (
len(a) > 1
and a[0].lower() == self.connection.get_nickname().lower()
):
self.do_command(e, a[1].strip())
return
def do_cmd(self, s):
self.args = None
tokens = s.split(" ", 1)
cmd = tokens[0]
if len(tokens) > 1:
self.args = tokens[1].strip()
for k, v in commands.items():
if cmd == k and hasattr(self, "do_" + cmd):
return getattr(self, "do_" + cmd)
if "synonyms" in v:
for s in v["synonyms"]:
if re.match(s, cmd) and hasattr(self, "do_" + k):
return getattr(self, "do_" + k)
return False
def do_help(self, c, e):
for k, v in commands.items():
self.send("{} -- {}".format(k, v["help"]))
if "synonyms" in v:
self.send(" synonyms: {}".format(", ".join(v["synonyms"])))
def do_die(self, c, e):
print("Ok master, I die. Aaargh...")
self.die()
def do_leave(self, c, e):
self.disconnect()
def get_unack_ko_services(self):
return [s for s in self.ko_services if not s["attrs"]["acknowledgement"]]
def regrouped_ko_services(self):
def regroup_key(elem):
return elem["attrs"]["display_name"]
return [
(group, [service["attrs"]["host_name"] for service in services])
for group, services in groupby(
sorted(self.get_unack_ko_services(), key=regroup_key), regroup_key
)
]
def do_list(self, c, e):
if self.ko_services:
host_by_service = [
(service, [hostname.split(".")[0] for hostname in hosts])
for service, hosts in sorted(
self.regrouped_ko_services(), key=lambda x: -len(x[1])
)
]
self.send(
"\n".join(
[
"{} ({}): {}".format(
service, len(hostnames), ", ".join(hostnames)
)
for service, hostnames in host_by_service
]
)
)
acknowledged = [s for s in self.ko_services if s["attrs"]["acknowledgement"]]
if acknowledged:
self.send(
"Acknowledged ({}): {}".format(
len(acknowledged),
', '.join({s["attrs"]["display_name"] for s in acknowledged})
)
)
else:
self.send("Nothing particularly exciting.")
def do_ack(self, c, e):
if self.args is None:
self.send(
e.source.nick + ": usage: !ack <pattern or all> [: comment]"
)
return
tokens = self.args.split(":", 1)
pattern, comment = tokens[0].strip(), ""
if len(tokens) > 1:
comment = tokens[1].strip()
if pattern == "all":
pattern = None
self.ack_service(pattern, comment, e.source.nick)
def do_recheck(self, c, e):
if self.args == "all":
self.args = None
self.recheck_service(self.args)
self.connection.execute_delayed(15, self.refresh_ko_services)
def do_mute(self, c, e):
self.muted = True
self.suffix_nick("[zZz]")
self.connection.execute_delayed(3600, self.refresh_ko_services)
def do_command(self, e, cmd):
nick = e.source.nick
c = self.connection
if self.do_cmd(cmd):
self.unmute()
(self.do_cmd(cmd))(c, e)
else:
self.send(nick + ": Not understood: " + cmd)
def main():
bot = IcingaBot(
settings["ircchan"],
settings["ircnick"],
settings["ircsrv"],
settings["ircport"],
)
bot.start()
if __name__ == "__main__":
main()