#! /usr/bin/env python3 # # Simple IRC Bot statusing icinga2 via its API. # # François Poulain # # # Based on example program using irc.bot. # from # Joel Rosdahl # https://github.com/jaraco/irc/raw/master/scripts/testbot.py import configparser import json import os.path import re import sys import requests import irc.bot import irc.strings """A simple IRC Bot statusing icinga2. It is based on TestBot example bot that uses the SingleServerIRCBot class from irc.bot. The bot enters a channel and listens for commands in private messages and channel traffic. Commands in channel messages are given by prefixing the text by the bot name followed by a colon. Periodically, the bot read Icinga2 status and report all new KO services. Requirements: python3-irc python3-requests The known commands are: """ commands = { "ack": {"help": "Acknowledge a given service."}, "recheck": { "help": "Recheck a given service or all services.", "synonyms": [r"refresh"], }, "list": {"help": "List all KO services.", "synonyms": [r"lsit", r"lits"]}, "leave": { "help": "Disconnect the bot." " The bot will try to reconnect after 300 seconds.", }, "mute": { "help": "Mute the bot (no more status report)." " The bot will unmute after 1 hour or after receiving any command.", "synonyms": [ r"fuck", r"chut", r"couché", r"sieste", r"t(a|o)\s*g(ueu|o)le", ], }, "die": {"help": "Let the bot cease to exist."}, "help": {"help": "Print help."}, } # Load configuration. configurationFilename = "/etc/icingabot/icingabot.conf" if os.path.isfile(configurationFilename): config = configparser.RawConfigParser() config.read(configurationFilename) settings = { "ircsrv": config.get("irc", "irc.server"), "ircchan": config.get("irc", "irc.chan"), "ircport": config.getint("irc", "irc.port"), "ircnick": config.get("irc", "irc.nick"), "irccmd": config.get("irc", "irc.cmd_prefix"), # see /etc/icinga2/conf.d/api-users.conf "icinga2user": config.get("icinga2", "icinga2.user"), "icinga2pass": config.get("icinga2", "icinga2.password"), "icinga2ca": config.get("icinga2", "icinga2.ca"), "icinga2fqdn": config.get("icinga2", "icinga2.fqdn"), "icinga2port": config.getint("icinga2", "icinga2.port"), } else: print("Missing configuration file [/etc/icingabot/icingabot.conf].") sys.exit() class Icinga2ServiceManager: notifications = [] def build_request_url(self, uri, params={}): # Since icinga2 wants « URL-encoded strings » but requests # seems to makes « application/x-www-form-urlencoded », # so we munge params and shortcut urlencode. # See # https://www.icinga.com/docs/icinga2/latest/doc/12-icinga2-api/#parameters url = "https://{}:{}{}".format( settings["icinga2fqdn"], settings["icinga2port"], uri ) return ( requests.Request("GET", url, params=params) .prepare() .url.replace("+", "%20") ) def fetch_notifications(self): headers = { "Accept": "application/json", "X-HTTP-Method-Override": "GET", } data = { "attrs": ["last_check_result"], "filter": "service.state!=ServiceOK", } try: r = requests.post( self.build_request_url("/v1/objects/services"), headers=headers, auth=(settings["icinga2user"], settings["icinga2pass"]), data=json.dumps(data), verify=settings["icinga2ca"], ) if r.status_code == 200: new_notifications = [ n for n in r.json()["results"] if n is not None ] news = [ n for n in new_notifications if n["name"] not in [nn["name"] for nn in self.notifications] ] lost = [ n for n in self.notifications if n["name"] not in [nn["name"] for nn in new_notifications] ] self.notifications = new_notifications return (lost, news) except Exception: self.send("Unable to fetch from Icinga2") return (False, False) def ack_service(self, srv, comment, nick): # weird but needed: if comment == "": comment = " " # /weird headers = { "Accept": "application/json", "X-HTTP-Method-Override": "POST", } data = { "author": nick, "comment": comment, } params = { "type": "Service", "filter": ( 'service.__name=="{}"'.format(srv) if srv is not None else "service.state!=ServiceOK" ), } try: r = requests.post( self.build_request_url( "/v1/actions/acknowledge-problem", params=params ), headers=headers, auth=(settings["icinga2user"], settings["icinga2pass"]), data=json.dumps(data), verify=settings["icinga2ca"], ) if r.status_code == 200: for a in r.json()["results"]: if a["code"] == 200.0: self.send(a["status"]) if srv is not None and not r.json()["results"]: self.send("No result for service name « {} »".format(srv)) else: self.send("{} for service name « {} »".format(r.text, srv)) except Exception: self.send("Unable to post to Icinga2") def recheck_service(self, srv): headers = { "Accept": "application/json", "X-HTTP-Method-Override": "POST", } params = { "type": "Service", "filter": ( 'service.__name=="{}"'.format(srv) if srv is not None else "service.state!=ServiceOK" ), } try: r = requests.post( self.build_request_url( "/v1/actions/reschedule-check", params=params ), headers=headers, auth=(settings["icinga2user"], settings["icinga2pass"]), verify=settings["icinga2ca"], ) if r.status_code == 200: for a in r.json()["results"]: if a["code"] == 200.0: self.send(a["status"]) if srv is not None and not r.json()["results"]: self.send("No result for service name « {} »".format(srv)) else: self.send("{} for service name « {} »".format(r.text, srv)) except Exception: self.send("Unable to post to Icinga2") class IcingaBot(Icinga2ServiceManager, irc.bot.SingleServerIRCBot): args = "" muted = False nick_suffix = "" def __init__(self, channel, nickname, server, port=6667): irc.bot.SingleServerIRCBot.__init__( self, [(server, port)], nickname, nickname, reconnection_interval=300, ) self.nick = nickname self.channel = channel self.connection.execute_every(30, self.refresh_notifications) self.refresh_notifications() def suffix_nick(self, suffix): self.nick_suffix = suffix if self.connection.is_connected(): self.connection.nick("{}{}".format(self.nick, suffix)) def unmute(self): self.muted = False if self.notifications: self.suffix_nick("[{}]".format(len(self.notifications))) else: self.suffix_nick("") def send(self, msg): if not self.muted and self.connection.is_connected(): for line in msg.split("\n"): self.connection.privmsg(self.channel, line) def refresh_notifications(self): lost, news = self.fetch_notifications() if lost is False and news is False: return if self.notifications: self.suffix_nick("[{}]".format(len(self.notifications))) else: self.suffix_nick("") for srv in lost: if srv is not None: self.send("{} is OK".format(srv["name"])) for srv in news: try: self.send( "{}: => {}".format( srv["name"], srv["attrs"]["last_check_result"]["output"], ) ) except Exception: self.send("{}: => No check result.".format(srv["name"])) def on_nicknameinuse(self, c, e): c.nick(+"_") def on_welcome(self, c, e): c.join(self.channel) def on_privmsg(self, c, e): self.do_command(e, e.arguments[0]) def on_pubmsg(self, c, e): if e.arguments[0].startswith(settings["irccmd"]): self.do_command(e, e.arguments[0][1:]) return a = e.arguments[0].split(":", 1) if ( len(a) > 1 and a[0].lower() == self.connection.get_nickname().lower() ): self.do_command(e, a[1].strip()) return def do_cmd(self, s): self.args = None tokens = s.split(" ", 1) cmd = tokens[0] if len(tokens) > 1: self.args = tokens[1].strip() for k, v in commands.items(): if cmd == k and hasattr(self, "do_" + cmd): return getattr(self, "do_" + cmd) if "synonyms" in v: for s in v["synonyms"]: if re.match(s, cmd) and hasattr(self, "do_" + k): return getattr(self, "do_" + k) return False def do_help(self, c, e): for k, v in commands.items(): self.send("{} -- {}".format(k, v["help"])) if "synonyms" in v: self.send(" synonyms: {}".format(", ".join(v["synonyms"]))) def do_die(self, c, e): print("Ok master, I die. Aaargh...") self.die() def do_leave(self, c, e): self.disconnect() def do_list(self, c, e): if self.notifications: for srv in self.notifications: try: self.send( "{}: => {}".format( srv["name"], srv["attrs"]["last_check_result"]["output"], ) ) except Exception: self.send("{}: => No check result.".format(srv["name"])) else: self.send("Nothing particularly exciting.") def do_ack(self, c, e): if self.args is None: self.send( e.source.nick + ": usage: !ack [: comment]" ) return tokens = self.args.split(":", 1) srv, comment = tokens[0].strip(), "" if len(tokens) > 1: comment = tokens[1].strip() if srv == "all": srv = None self.ack_service(srv, comment, e.source.nick) def do_recheck(self, c, e): if self.args == "all": self.args = None self.recheck_service(self.args) self.connection.execute_delayed(1, self.refresh_notifications) def do_mute(self, c, e): self.muted = True self.suffix_nick("[zZz]") self.connection.execute_delayed(3600, self.refresh_notifications) def do_command(self, e, cmd): nick = e.source.nick c = self.connection if self.do_cmd(cmd): self.unmute() (self.do_cmd(cmd))(c, e) else: self.send(nick + ": Not understood: " + cmd) def main(): bot = IcingaBot( settings["ircchan"], settings["ircnick"], settings["ircsrv"], settings["ircport"], ) bot.start() if __name__ == "__main__": main()