diff --git a/icingabot.py b/icingabot.py new file mode 100755 index 0000000..3eaec44 --- /dev/null +++ b/icingabot.py @@ -0,0 +1,321 @@ +#! /usr/bin/env python3 +# +# Simple IRC Bot statusing icinga2 via its API. +# +# François Poulain +# +# +# Based on example program using irc.bot. +# from +# Joel Rosdahl +# https://github.com/jaraco/irc/raw/master/scripts/testbot.py + +"""A simple IRC Bot statusing icinga2. + +It is based on TestBot example bot that uses the SingleServerIRCBot class from +irc.bot. The bot enters a channel and listens for commands in private messages +and channel traffic. Commands in channel messages are given by prefixing the +text by the bot name followed by a colon. Periodically, the bot read Icinga2 +status and report all new KO services. + +Requirements: python3-irc python3-requests + +The known commands are: + """ +commands = { + "ack" : { + "help": "Acknowledge a given service.", + }, + "recheck" : { + "help": "Recheck a given service or all services.", + "synonyms" : [r"refresh"], + }, + "list" : { + "help": "List all KO services.", + "synonyms" : [r"lsit", r"lits"], + }, + "leave" : { + "help": "Disconnect the bot. The bot will try to reconnect after 300 seconds.", + }, + "mute" : { + "help": "Mute the bot (no more status report). The bot will unmute after 1 hour or after receiving any command.", + "synonyms" : [r"fuck", r"chut", r"couché", r"sieste", r"t(a|o)\s*g(ueu|o)le"], + }, + "die" : { + "help": "Let the bot cease to exist.", + }, + "help" : { + "help": "Print help.", + }, + } + +import os.path +import irc.bot +import irc.strings +import re +import requests, json +import configparser + +# Load configuration. +configurationFilename="/etc/icingabot/icingabot.conf" +if os.path.isfile(configurationFilename): + config = configparser.RawConfigParser() + config.read(configurationFilename) + + settings = { + "ircsrv": config.get("irc", "irc.server"), + "ircchan": config.get("irc", "irc.chan"), + "ircport": config.getint("irc", "irc.port"), + "ircnick": config.get("irc", "irc.nick"), + "irccmd": config.get("irc", "irc.cmd_prefix"), + + # see /etc/icinga2/conf.d/api-users.conf + "icinga2user" : config.get("icinga2", "icinga2.user"), + "icinga2pass" : config.get("icinga2", "icinga2.password"), + "icinga2ca" : config.get("icinga2", "icinga2.ca"), + "icinga2fqdn" : config.get("icinga2", "icinga2.fqdn"), + "icinga2port" : config.getint("icinga2", "icinga2.port") + } +else: + print ("Missing configuration file [/etc/icingabot/icingabot.conf].") + sys.exit() + + +class Icinga2ServiceManager: + notifications= [] + + def build_request_url (self, uri, params={}): + # Since icinga2 wants « URL-encoded strings » but requests + # seems to makes « application/x-www-form-urlencoded », + # so we munge params and shortcut urlencode. + # See https://www.icinga.com/docs/icinga2/latest/doc/12-icinga2-api/#parameters + url = 'https://{}:{}{}'.format(settings["icinga2fqdn"], settings["icinga2port"], uri) + return requests.Request('GET', url, params=params).prepare().url.replace('+', '%20') + + def fetch_notifications (self): + headers = { + 'Accept': 'application/json', + 'X-HTTP-Method-Override': 'GET' + } + data = { + "attrs": [ "last_check_result" ], + "filter": "service.state!=ServiceOK", + } + try: + r = requests.post(self.build_request_url ("/v1/objects/services"), + headers=headers, + auth=(settings["icinga2user"], settings["icinga2pass"]), + data=json.dumps(data), + verify=settings["icinga2ca"]) + if (r.status_code == 200): + new_notifications = [n for n in r.json()['results'] if n != None] + news= [n for n in new_notifications if n['name'] not in [nn['name'] for nn in self.notifications]] + lost= [n for n in self.notifications if n['name'] not in [nn['name'] for nn in new_notifications ]] + self.notifications = new_notifications + return (lost, news) + except: + self.send("Unable to fetch from Icinga2") + return (False,False) + + def ack_service (self, srv, comment, nick): + # weird but needed: + if comment == '': + comment = ' ' + # /weird + headers = { + 'Accept': 'application/json', + 'X-HTTP-Method-Override': 'POST' + } + data = { + "author": nick, + "comment": comment, + } + params = { + "type" : "Service", + "filter" : ('service.__name=="{}"'.format(srv) if srv != None else "service.state!=ServiceOK"), + } + try: + r = requests.post(self.build_request_url ("/v1/actions/acknowledge-problem", params=params), + headers=headers, + auth=(settings["icinga2user"], settings["icinga2pass"]), + data=json.dumps(data), + verify=settings["icinga2ca"]) + if r.status_code == 200: + for a in r.json()['results']: + if a["code"] == 200.0: + self.send (a["status"]) + if srv != None and not r.json()['results']: + self.send("No result for service name « {} »".format(srv)) + else: + self.send("{} for service name « {} »".format(r.text, srv)) + except: + self.send("Unable to post to Icinga2") + + def recheck_service (self, srv): + headers = { + 'Accept': 'application/json', + 'X-HTTP-Method-Override': 'POST' + } + params = { + "type" : "Service", + "filter" : ('service.__name=="{}"'.format(srv) if srv != None else "service.state!=ServiceOK"), + } + try: + r = requests.post(self.build_request_url ("/v1/actions/reschedule-check", params=params), + headers=headers, + auth=(settings["icinga2user"], settings["icinga2pass"]), + verify=settings["icinga2ca"]) + if r.status_code == 200: + for a in r.json()['results']: + if a["code"] == 200.0: + self.send (a["status"]) + if srv != None and not r.json()['results']: + self.send("No result for service name « {} »".format(srv)) + else: + self.send("{} for service name « {} »".format(r.text, srv)) + except: + self.send("Unable to post to Icinga2") + +class IcingaBot(Icinga2ServiceManager, irc.bot.SingleServerIRCBot): + args= '' + muted= False + nick_suffix= '' + + def __init__(self, channel, nickname, server, port=6667): + irc.bot.SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname, reconnection_interval=300) + self.nick= nickname + self.channel = channel + self.connection.execute_every(30, self.refresh_notifications) + self.refresh_notifications () + + def suffix_nick (self, suffix): + self.nick_suffix= suffix + if self.connection.is_connected(): + self.connection.nick ('{}{}'.format(self.nick, suffix)) + + def unmute (self): + self.muted= False + if self.notifications: + self.suffix_nick ('[{}]'.format(len(self.notifications))) + else: + self.suffix_nick ('') + + def send (self, msg): + if not self.muted and self.connection.is_connected(): + for line in msg.split('\n'): + self.connection.privmsg(self.channel, line) + + def refresh_notifications (self): + lost, news = self.fetch_notifications() + if lost == False and news == False: + return + if self.notifications: + self.suffix_nick ('[{}]'.format(len(self.notifications))) + else: + self.suffix_nick ('') + for srv in lost: + if srv != None: + self.send("{} is OK".format(srv['name'])) + for srv in news: + try: + self.send("{}: => {}".format(srv['name'], srv['attrs']['last_check_result']['output'])) + except: + self.send("{}: => No check result.".format(srv['name'])) + + def on_nicknameinuse(self, c, e): + c.nick( + "_") + + def on_welcome(self, c, e): + c.join(self.channel) + + def on_privmsg(self, c, e): + self.do_command(e, e.arguments[0]) + + def on_pubmsg(self, c, e): + if e.arguments[0].startswith (settings['irccmd']): + self.do_command(e, e.arguments[0][1:]) + return + a = e.arguments[0].split(":", 1) + if len(a) > 1 and a[0].lower() == self.connection.get_nickname().lower(): + self.do_command(e, a[1].strip()) + return + + def do_cmd (self, s): + self.args= None + l = s.split(' ', 1) + cmd= l[0] + if len(l)>1: + self.args= l[1].strip() + + for k,v in commands.items(): + if cmd == k and hasattr(self, "do_"+cmd): + return getattr(self, "do_"+cmd) + if "synonyms" in v: + for s in v["synonyms"]: + if re.match(s, cmd) and hasattr(self, "do_"+k): + return getattr(self, "do_"+k) + return False + + def do_help (self, c, e): + for k,v in commands.items(): + self.send("{} -- {}".format(k, v["help"])) + if "synonyms" in v: + self.send(" synonyms: {}".format(", ".join(v["synonyms"]))) + + def do_die (self, c, e): + print('Ok master, I die. Aaargh...') + self.die () + + def do_leave (self, c, e): + self.disconnect() + + def do_list (self, c, e): + if self.notifications: + for srv in self.notifications: + try: + self.send("{}: => {}".format(srv['name'], srv['attrs']['last_check_result']['output'])) + except: + self.send("{}: => No check result.".format(srv['name'])) + else: + self.send("Nothing particularly exciting.") + + def do_ack (self, c, e): + if self.args == None: + self.send(e.source.nick + ": usage: !ack [: comment]") + return + l = self.args.split(':', 1) + srv, comment= l[0].strip(), '' + if len(l)>1: + comment= l[1].strip() + if srv == 'all': + srv= None + self.ack_service(srv, comment, e.source.nick) + + def do_recheck (self, c, e): + if self.args == 'all': + self.args == None + self.recheck_service(self.args) + self.connection.execute_delayed(1, self.refresh_notifications) + + def do_mute (self, c, e): + self.muted= True + self.suffix_nick ('[zZz]') + self.connection.execute_delayed(3600, self.refresh_notifications) + + + def do_command(self, e, cmd): + nick = e.source.nick + c = self.connection + + if self.do_cmd (cmd): + self.unmute () + (self.do_cmd (cmd)) (c, e) + else: + self.send(nick + ": Not understood: " + cmd) + +def main(): + bot = IcingaBot(settings['ircchan'], settings['ircnick'], settings['ircsrv'], settings['ircport']) + bot.start() + +if __name__ == "__main__": + main()