#! /usr/bin/env python3 # # Simple IRC Bot statusing icinga2 via its API. # # François Poulain # # # Based on example program using irc.bot. # from # Joel Rosdahl # https://github.com/jaraco/irc/raw/master/scripts/testbot.py """A simple IRC Bot statusing icinga2. It is based on TestBot example bot that uses the SingleServerIRCBot class from irc.bot. The bot enters a channel and listens for commands in private messages and channel traffic. Commands in channel messages are given by prefixing the text by the bot name followed by a colon. Periodically, the bot read Icinga2 status and report all new KO services. Requirements: python3-irc python3-requests The known commands are: """ commands = { "ack" : { "help": "Acknowledge a given service.", }, "recheck" : { "help": "Recheck a given service or all services.", "synonyms" : [r"refresh"], }, "list" : { "help": "List all KO services.", "synonyms" : [r"lsit", r"lits"], }, "leave" : { "help": "Disconnect the bot. The bot will try to reconnect after 300 seconds.", }, "mute" : { "help": "Mute the bot (no more status report). The bot will unmute after 1 hour or after receiving any command.", "synonyms" : [r"fuck", r"chut", r"couché", r"sieste", r"t(a|o)\s*g(ueu|o)le"], }, "die" : { "help": "Let the bot cease to exist.", }, "help" : { "help": "Print help.", }, } import os.path import irc.bot import irc.strings import re import requests, json import configparser # Load configuration. configurationFilename="/etc/icingabot/icingabot.conf" if os.path.isfile(configurationFilename): config = configparser.RawConfigParser() config.read(configurationFilename) settings = { "ircsrv": config.get("irc", "irc.server"), "ircchan": config.get("irc", "irc.chan"), "ircport": config.getint("irc", "irc.port"), "ircnick": config.get("irc", "irc.nick"), "irccmd": config.get("irc", "irc.cmd_prefix"), # see /etc/icinga2/conf.d/api-users.conf "icinga2user" : config.get("icinga2", "icinga2.user"), "icinga2pass" : config.get("icinga2", "icinga2.password"), "icinga2ca" : config.get("icinga2", "icinga2.ca"), "icinga2fqdn" : config.get("icinga2", "icinga2.fqdn"), "icinga2port" : config.getint("icinga2", "icinga2.port") } else: print ("Missing configuration file [/etc/icingabot/icingabot.conf].") sys.exit() class Icinga2ServiceManager: notifications= [] def build_request_url (self, uri, params={}): # Since icinga2 wants « URL-encoded strings » but requests # seems to makes « application/x-www-form-urlencoded », # so we munge params and shortcut urlencode. # See https://www.icinga.com/docs/icinga2/latest/doc/12-icinga2-api/#parameters url = 'https://{}:{}{}'.format(settings["icinga2fqdn"], settings["icinga2port"], uri) return requests.Request('GET', url, params=params).prepare().url.replace('+', '%20') def fetch_notifications (self): headers = { 'Accept': 'application/json', 'X-HTTP-Method-Override': 'GET' } data = { "attrs": [ "last_check_result" ], "filter": "service.state!=ServiceOK", } try: r = requests.post(self.build_request_url ("/v1/objects/services"), headers=headers, auth=(settings["icinga2user"], settings["icinga2pass"]), data=json.dumps(data), verify=settings["icinga2ca"]) if (r.status_code == 200): new_notifications = [n for n in r.json()['results'] if n != None] news= [n for n in new_notifications if n['name'] not in [nn['name'] for nn in self.notifications]] lost= [n for n in self.notifications if n['name'] not in [nn['name'] for nn in new_notifications ]] self.notifications = new_notifications return (lost, news) except: self.send("Unable to fetch from Icinga2") return (False,False) def ack_service (self, srv, comment, nick): # weird but needed: if comment == '': comment = ' ' # /weird headers = { 'Accept': 'application/json', 'X-HTTP-Method-Override': 'POST' } data = { "author": nick, "comment": comment, } params = { "type" : "Service", "filter" : ('service.__name=="{}"'.format(srv) if srv != None else "service.state!=ServiceOK"), } try: r = requests.post(self.build_request_url ("/v1/actions/acknowledge-problem", params=params), headers=headers, auth=(settings["icinga2user"], settings["icinga2pass"]), data=json.dumps(data), verify=settings["icinga2ca"]) if r.status_code == 200: for a in r.json()['results']: if a["code"] == 200.0: self.send (a["status"]) if srv != None and not r.json()['results']: self.send("No result for service name « {} »".format(srv)) else: self.send("{} for service name « {} »".format(r.text, srv)) except: self.send("Unable to post to Icinga2") def recheck_service (self, srv): headers = { 'Accept': 'application/json', 'X-HTTP-Method-Override': 'POST' } params = { "type" : "Service", "filter" : ('service.__name=="{}"'.format(srv) if srv != None else "service.state!=ServiceOK"), } try: r = requests.post(self.build_request_url ("/v1/actions/reschedule-check", params=params), headers=headers, auth=(settings["icinga2user"], settings["icinga2pass"]), verify=settings["icinga2ca"]) if r.status_code == 200: for a in r.json()['results']: if a["code"] == 200.0: self.send (a["status"]) if srv != None and not r.json()['results']: self.send("No result for service name « {} »".format(srv)) else: self.send("{} for service name « {} »".format(r.text, srv)) except: self.send("Unable to post to Icinga2") class IcingaBot(Icinga2ServiceManager, irc.bot.SingleServerIRCBot): args= '' muted= False nick_suffix= '' def __init__(self, channel, nickname, server, port=6667): irc.bot.SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname, reconnection_interval=300) self.nick= nickname self.channel = channel self.connection.execute_every(30, self.refresh_notifications) self.refresh_notifications () def suffix_nick (self, suffix): self.nick_suffix= suffix if self.connection.is_connected(): self.connection.nick ('{}{}'.format(self.nick, suffix)) def unmute (self): self.muted= False if self.notifications: self.suffix_nick ('[{}]'.format(len(self.notifications))) else: self.suffix_nick ('') def send (self, msg): if not self.muted and self.connection.is_connected(): for line in msg.split('\n'): self.connection.privmsg(self.channel, line) def refresh_notifications (self): lost, news = self.fetch_notifications() if lost == False and news == False: return if self.notifications: self.suffix_nick ('[{}]'.format(len(self.notifications))) else: self.suffix_nick ('') for srv in lost: if srv != None: self.send("{} is OK".format(srv['name'])) for srv in news: try: self.send("{}: => {}".format(srv['name'], srv['attrs']['last_check_result']['output'])) except: self.send("{}: => No check result.".format(srv['name'])) def on_nicknameinuse(self, c, e): c.nick( + "_") def on_welcome(self, c, e): c.join(self.channel) def on_privmsg(self, c, e): self.do_command(e, e.arguments[0]) def on_pubmsg(self, c, e): if e.arguments[0].startswith (settings['irccmd']): self.do_command(e, e.arguments[0][1:]) return a = e.arguments[0].split(":", 1) if len(a) > 1 and a[0].lower() == self.connection.get_nickname().lower(): self.do_command(e, a[1].strip()) return def do_cmd (self, s): self.args= None l = s.split(' ', 1) cmd= l[0] if len(l)>1: self.args= l[1].strip() for k,v in commands.items(): if cmd == k and hasattr(self, "do_"+cmd): return getattr(self, "do_"+cmd) if "synonyms" in v: for s in v["synonyms"]: if re.match(s, cmd) and hasattr(self, "do_"+k): return getattr(self, "do_"+k) return False def do_help (self, c, e): for k,v in commands.items(): self.send("{} -- {}".format(k, v["help"])) if "synonyms" in v: self.send(" synonyms: {}".format(", ".join(v["synonyms"]))) def do_die (self, c, e): print('Ok master, I die. Aaargh...') self.die () def do_leave (self, c, e): self.disconnect() def do_list (self, c, e): if self.notifications: for srv in self.notifications: try: self.send("{}: => {}".format(srv['name'], srv['attrs']['last_check_result']['output'])) except: self.send("{}: => No check result.".format(srv['name'])) else: self.send("Nothing particularly exciting.") def do_ack (self, c, e): if self.args == None: self.send(e.source.nick + ": usage: !ack [: comment]") return l = self.args.split(':', 1) srv, comment= l[0].strip(), '' if len(l)>1: comment= l[1].strip() if srv == 'all': srv= None self.ack_service(srv, comment, e.source.nick) def do_recheck (self, c, e): if self.args == 'all': self.args == None self.recheck_service(self.args) self.connection.execute_delayed(1, self.refresh_notifications) def do_mute (self, c, e): self.muted= True self.suffix_nick ('[zZz]') self.connection.execute_delayed(3600, self.refresh_notifications) def do_command(self, e, cmd): nick = e.source.nick c = self.connection if self.do_cmd (cmd): self.unmute () (self.do_cmd (cmd)) (c, e) else: self.send(nick + ": Not understood: " + cmd) def main(): bot = IcingaBot(settings['ircchan'], settings['ircnick'], settings['ircsrv'], settings['ircport']) bot.start() if __name__ == "__main__": main()