#!/usr/bin/python3 # # Simple IRC Bot statusing icinga2 via its API. # # François Poulain # # # Based on example program using irc.bot. # from # Joel Rosdahl # https://github.com/jaraco/irc/raw/master/scripts/testbot.py import configparser import json import os.path import re import sys from itertools import groupby import requests import irc.bot """A simple IRC Bot statusing icinga2. It is based on TestBot example bot that uses the SingleServerIRCBot class from irc.bot. The bot enters a channel and listens for commands in private messages and channel traffic. Commands in channel messages are given by prefixing the text by the bot name followed by a colon. Periodically, the bot read Icinga2 status and report all new KO services. Requirements: python3-irc python3-requests The known commands are: """ commands = { "ack": {"help": "Acknowledge all services matching the query."}, "recheck": { "help": "Recheck all services matching the query.", "synonyms": [r"refresh"], }, "list": {"help": "List all KO services.", "synonyms": [r"lsit", r"lits"]}, "leave": { "help": "Disconnect the bot." " The bot will try to reconnect after 300 seconds.", }, "mute": { "help": "Mute the bot (no more status report)." " The bot will unmute after 1 hour or after receiving any command.", "synonyms": [ r"fuck", r"chut", r"couché", r"sieste", r"t(a|o)\s*g(ueu|o)le", ], }, "die": {"help": "Let the bot cease to exist."}, "help": {"help": "Print help."}, } # Load configuration. configurationFilename = "/etc/icingabot/icingabot.conf" if os.path.isfile(configurationFilename): config = configparser.RawConfigParser() config.read(configurationFilename) settings = { "ircsrv": config.get("irc", "irc.server"), "ircchan": config.get("irc", "irc.chan"), "ircport": config.getint("irc", "irc.port"), "ircnick": config.get("irc", "irc.nick"), "irccmd": config.get("irc", "irc.cmd_prefix"), # see /etc/icinga2/conf.d/api-users.conf "icinga2user": config.get("icinga2", "icinga2.user"), "icinga2pass": config.get("icinga2", "icinga2.password"), "icinga2ca": config.get("icinga2", "icinga2.ca"), "icinga2fqdn": config.get("icinga2", "icinga2.fqdn"), "icinga2port": config.getint("icinga2", "icinga2.port"), } else: print("Missing configuration file [/etc/icingabot/icingabot.conf].") sys.exit() class Icinga2ServiceManager: ko_services = [] error_counter = 0 def build_request_url(self, uri, params={}): # Since icinga2 wants « URL-encoded strings » but requests # seems to makes « application/x-www-form-urlencoded », # so we munge params and shortcut urlencode. # See # https://www.icinga.com/docs/icinga2/latest/doc/12-icinga2-api/#parameters url = "https://{}:{}{}".format( settings["icinga2fqdn"], settings["icinga2port"], uri ) return ( requests.Request("GET", url, params=params) .prepare() .url.replace("+", "%20") ) def fetch_ko_services(self): headers = { "Accept": "application/json", "X-HTTP-Method-Override": "GET", } data = { "attrs": [ "last_check_result", "display_name", "host_name", "acknowledgement", ], "filter": "service.state!=ServiceOK", } try: r = requests.post( self.build_request_url("/v1/objects/services"), headers=headers, auth=(settings["icinga2user"], settings["icinga2pass"]), data=json.dumps(data), verify=settings["icinga2ca"], ) self.error_counter = 0 if r.status_code == 200: new_ko_services = [n for n in r.json()["results"] if n is not None] news = [ n for n in new_ko_services if n["name"] not in [nn["name"] for nn in self.ko_services] ] lost = [ n for n in self.ko_services if n["name"] not in [nn["name"] for nn in new_ko_services] ] self.ko_services = new_ko_services return (lost, news) except Exception as e: if self.error_counter < 10: self.send("Unable to fetch from Icinga2: {}".format(e)) self.error_counter += 1 return (False, False) def post_on_services(self, pattern, uri, data={}): headers = { "Accept": "application/json", "X-HTTP-Method-Override": "POST", } params = {"type": "Service", "filter": "service.state!=ServiceOK"} if pattern: params['filter'] += '&& match("*{}*", service.__name)'.format(pattern) try: r = requests.post( self.build_request_url(uri, params=params), headers=headers, auth=(settings["icinga2user"], settings["icinga2pass"]), data=json.dumps(data), verify=settings["icinga2ca"], ) for a in r.json()["results"]: self.send(a["status"]) if pattern is not None and not r.json()["results"]: self.send("No matching service for « {} »".format(pattern)) except Exception as e: self.send("Unable to post to Icinga2: {}".format(e)) def ack_service(self, pattern, comment, nick): data = { "author": nick, "comment": comment or " ", # never "" ! } self.post_on_services(pattern, '/v1/actions/acknowledge-problem', data) def recheck_service(self, pattern): self.post_on_services(pattern, '/v1/actions/reschedule-check') class IcingaBot(Icinga2ServiceManager, irc.bot.SingleServerIRCBot): args = "" muted = False nick_suffix = "" def __init__(self, channel, nickname, server, port=6667): irc.bot.SingleServerIRCBot.__init__( self, [(server, port)], nickname, nickname, reconnection_interval=300, ) self.nick = nickname self.channel = channel self.connection.execute_every(30, self.refresh_ko_services) self.connection.execute_every(300, self.refresh_nick) self.refresh_ko_services() def refresh_nick(self): if self.connection.is_connected(): self.connection.nick("{}{}".format(self.nick, self.nick_suffix)) def unmute(self): self.muted = False if self.ko_services: self.nick_suffix = "[{}]".format(len(self.ko_services)) else: self.nick_suffix = "" def send(self, msg): if not self.muted and self.connection.is_connected(): for line in msg.split("\n"): self.connection.privmsg(self.channel, line) def refresh_ko_services(self): lost, news = self.fetch_ko_services() if lost is False and news is False: return if self.ko_services: self.nick_suffix = "[{}]".format(len(self.ko_services)) else: self.nick_suffix = "" return def on_nicknameinuse(self, c, e): c.nick(c.get_nickname() + "_") def on_welcome(self, c, e): c.join(self.channel) def on_privmsg(self, c, e): self.do_command(e, e.arguments[0]) def on_pubmsg(self, c, e): if e.arguments[0].startswith(settings["irccmd"]): self.do_command(e, e.arguments[0][1:]) return a = e.arguments[0].split(":", 1) if len(a) > 1 and a[0].lower() == self.connection.get_nickname().lower(): self.do_command(e, a[1].strip()) return def do_cmd(self, s): self.args = None tokens = s.split(" ", 1) cmd = tokens[0] if len(tokens) > 1: self.args = tokens[1].strip() for k, v in commands.items(): if cmd == k and hasattr(self, "do_" + cmd): return getattr(self, "do_" + cmd) if "synonyms" in v: for s in v["synonyms"]: if re.match(s, cmd) and hasattr(self, "do_" + k): return getattr(self, "do_" + k) return False def do_help(self, c, e): for k, v in commands.items(): self.send("{} -- {}".format(k, v["help"])) if "synonyms" in v: self.send(" synonyms: {}".format(", ".join(v["synonyms"]))) def do_die(self, c, e): print("Ok master, I die. Aaargh...") self.die() def do_leave(self, c, e): self.disconnect() def get_unack_ko_services(self): return [s for s in self.ko_services if not s["attrs"]["acknowledgement"]] def regrouped_ko_services(self): def regroup_key(elem): return elem["attrs"]["display_name"] return [ (group, [service["attrs"]["host_name"] for service in services]) for group, services in groupby( sorted(self.get_unack_ko_services(), key=regroup_key), regroup_key, ) ] def do_list(self, c, e): if self.ko_services: host_by_service = [ (service, [hostname.split(".")[0] for hostname in hosts]) for service, hosts in sorted( self.regrouped_ko_services(), key=lambda x: -len(x[1]) ) ] self.send( "\n".join( [ "{} ({}): {}".format( service, len(hostnames), ", ".join(hostnames) ) for service, hostnames in host_by_service ] ) ) acknowledged = [ s for s in self.ko_services if s["attrs"]["acknowledgement"] ] if acknowledged: self.send( "Acknowledged ({}): {}".format( len(acknowledged), ', '.join({s["attrs"]["display_name"] for s in acknowledged}), ) ) else: self.send("Nothing particularly exciting.") def do_ack(self, c, e): if self.args is None: self.send(e.source.nick + ": usage: !ack [: comment]") return tokens = self.args.split(":", 1) pattern, comment = tokens[0].strip(), "" if len(tokens) > 1: comment = tokens[1].strip() if pattern == "all": pattern = None self.ack_service(pattern, comment, e.source.nick) def do_recheck(self, c, e): if self.args == "all": self.args = None self.recheck_service(self.args) self.connection.execute_delayed(15, self.refresh_ko_services) def do_mute(self, c, e): self.muted = True self.nick_suffix = "[zZz]" self.connection.execute_delayed(3600, self.refresh_ko_services) def do_command(self, e, cmd): nick = e.source.nick c = self.connection if self.do_cmd(cmd): self.unmute() (self.do_cmd(cmd))(c, e) else: self.send(nick + ": Not understood: " + cmd) def main(): bot = IcingaBot( settings["ircchan"], settings["ircnick"], settings["ircsrv"], settings["ircport"], ) bot.start() if __name__ == "__main__": main()