From 9fdd726de02cc38d555379c523d44bee5eb60d34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Poulain?= Date: Sun, 26 Apr 2020 11:58:57 +0200 Subject: [PATCH] =?UTF-8?q?reformattage=20=C3=A0=20l=20aide=20de=20black?= =?UTF-8?q?=20et=20passage=20de=20flake8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- icingabot.py | 405 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 236 insertions(+), 169 deletions(-) diff --git a/icingabot.py b/icingabot.py index 3eaec44..bc72b26 100755 --- a/icingabot.py +++ b/icingabot.py @@ -1,5 +1,5 @@ #! /usr/bin/env python3 -# +# # Simple IRC Bot statusing icinga2 via its API. # # François Poulain @@ -10,6 +10,17 @@ # Joel Rosdahl # https://github.com/jaraco/irc/raw/master/scripts/testbot.py +import configparser +import json +import os.path +import re +import sys + +import requests + +import irc.bot +import irc.strings + """A simple IRC Bot statusing icinga2. It is based on TestBot example bot that uses the SingleServerIRCBot class from @@ -23,41 +34,33 @@ Requirements: python3-irc python3-requests The known commands are: """ commands = { - "ack" : { - "help": "Acknowledge a given service.", - }, - "recheck" : { - "help": "Recheck a given service or all services.", - "synonyms" : [r"refresh"], - }, - "list" : { - "help": "List all KO services.", - "synonyms" : [r"lsit", r"lits"], - }, - "leave" : { - "help": "Disconnect the bot. The bot will try to reconnect after 300 seconds.", - }, - "mute" : { - "help": "Mute the bot (no more status report). The bot will unmute after 1 hour or after receiving any command.", - "synonyms" : [r"fuck", r"chut", r"couché", r"sieste", r"t(a|o)\s*g(ueu|o)le"], - }, - "die" : { - "help": "Let the bot cease to exist.", - }, - "help" : { - "help": "Print help.", - }, - } - -import os.path -import irc.bot -import irc.strings -import re -import requests, json -import configparser + "ack": {"help": "Acknowledge a given service."}, + "recheck": { + "help": "Recheck a given service or all services.", + "synonyms": [r"refresh"], + }, + "list": {"help": "List all KO services.", "synonyms": [r"lsit", r"lits"]}, + "leave": { + "help": "Disconnect the bot." + " The bot will try to reconnect after 300 seconds.", + }, + "mute": { + "help": "Mute the bot (no more status report)." + " The bot will unmute after 1 hour or after receiving any command.", + "synonyms": [ + r"fuck", + r"chut", + r"couché", + r"sieste", + r"t(a|o)\s*g(ueu|o)le", + ], + }, + "die": {"help": "Let the bot cease to exist."}, + "help": {"help": "Print help."}, +} # Load configuration. -configurationFilename="/etc/icingabot/icingabot.conf" +configurationFilename = "/etc/icingabot/icingabot.conf" if os.path.isfile(configurationFilename): config = configparser.RawConfigParser() config.read(configurationFilename) @@ -68,162 +71,210 @@ if os.path.isfile(configurationFilename): "ircport": config.getint("irc", "irc.port"), "ircnick": config.get("irc", "irc.nick"), "irccmd": config.get("irc", "irc.cmd_prefix"), - # see /etc/icinga2/conf.d/api-users.conf - "icinga2user" : config.get("icinga2", "icinga2.user"), - "icinga2pass" : config.get("icinga2", "icinga2.password"), - "icinga2ca" : config.get("icinga2", "icinga2.ca"), - "icinga2fqdn" : config.get("icinga2", "icinga2.fqdn"), - "icinga2port" : config.getint("icinga2", "icinga2.port") + "icinga2user": config.get("icinga2", "icinga2.user"), + "icinga2pass": config.get("icinga2", "icinga2.password"), + "icinga2ca": config.get("icinga2", "icinga2.ca"), + "icinga2fqdn": config.get("icinga2", "icinga2.fqdn"), + "icinga2port": config.getint("icinga2", "icinga2.port"), } else: - print ("Missing configuration file [/etc/icingabot/icingabot.conf].") + print("Missing configuration file [/etc/icingabot/icingabot.conf].") sys.exit() - + class Icinga2ServiceManager: - notifications= [] + notifications = [] - def build_request_url (self, uri, params={}): + def build_request_url(self, uri, params={}): # Since icinga2 wants « URL-encoded strings » but requests # seems to makes « application/x-www-form-urlencoded », # so we munge params and shortcut urlencode. - # See https://www.icinga.com/docs/icinga2/latest/doc/12-icinga2-api/#parameters - url = 'https://{}:{}{}'.format(settings["icinga2fqdn"], settings["icinga2port"], uri) - return requests.Request('GET', url, params=params).prepare().url.replace('+', '%20') + # See + # https://www.icinga.com/docs/icinga2/latest/doc/12-icinga2-api/#parameters + url = "https://{}:{}{}".format( + settings["icinga2fqdn"], settings["icinga2port"], uri + ) + return ( + requests.Request("GET", url, params=params) + .prepare() + .url.replace("+", "%20") + ) - def fetch_notifications (self): + def fetch_notifications(self): headers = { - 'Accept': 'application/json', - 'X-HTTP-Method-Override': 'GET' - } + "Accept": "application/json", + "X-HTTP-Method-Override": "GET", + } data = { - "attrs": [ "last_check_result" ], - "filter": "service.state!=ServiceOK", - } + "attrs": ["last_check_result"], + "filter": "service.state!=ServiceOK", + } try: - r = requests.post(self.build_request_url ("/v1/objects/services"), - headers=headers, - auth=(settings["icinga2user"], settings["icinga2pass"]), - data=json.dumps(data), - verify=settings["icinga2ca"]) - if (r.status_code == 200): - new_notifications = [n for n in r.json()['results'] if n != None] - news= [n for n in new_notifications if n['name'] not in [nn['name'] for nn in self.notifications]] - lost= [n for n in self.notifications if n['name'] not in [nn['name'] for nn in new_notifications ]] + r = requests.post( + self.build_request_url("/v1/objects/services"), + headers=headers, + auth=(settings["icinga2user"], settings["icinga2pass"]), + data=json.dumps(data), + verify=settings["icinga2ca"], + ) + if r.status_code == 200: + new_notifications = [ + n for n in r.json()["results"] if n is not None + ] + news = [ + n + for n in new_notifications + if n["name"] + not in [nn["name"] for nn in self.notifications] + ] + lost = [ + n + for n in self.notifications + if n["name"] + not in [nn["name"] for nn in new_notifications] + ] self.notifications = new_notifications return (lost, news) - except: + except Exception: self.send("Unable to fetch from Icinga2") - return (False,False) + return (False, False) - def ack_service (self, srv, comment, nick): + def ack_service(self, srv, comment, nick): # weird but needed: - if comment == '': - comment = ' ' + if comment == "": + comment = " " # /weird headers = { - 'Accept': 'application/json', - 'X-HTTP-Method-Override': 'POST' - } + "Accept": "application/json", + "X-HTTP-Method-Override": "POST", + } data = { - "author": nick, - "comment": comment, - } + "author": nick, + "comment": comment, + } params = { - "type" : "Service", - "filter" : ('service.__name=="{}"'.format(srv) if srv != None else "service.state!=ServiceOK"), - } + "type": "Service", + "filter": ( + 'service.__name=="{}"'.format(srv) + if srv is not None + else "service.state!=ServiceOK" + ), + } try: - r = requests.post(self.build_request_url ("/v1/actions/acknowledge-problem", params=params), - headers=headers, - auth=(settings["icinga2user"], settings["icinga2pass"]), - data=json.dumps(data), - verify=settings["icinga2ca"]) + r = requests.post( + self.build_request_url( + "/v1/actions/acknowledge-problem", params=params + ), + headers=headers, + auth=(settings["icinga2user"], settings["icinga2pass"]), + data=json.dumps(data), + verify=settings["icinga2ca"], + ) if r.status_code == 200: - for a in r.json()['results']: + for a in r.json()["results"]: if a["code"] == 200.0: - self.send (a["status"]) - if srv != None and not r.json()['results']: + self.send(a["status"]) + if srv is not None and not r.json()["results"]: self.send("No result for service name « {} »".format(srv)) else: self.send("{} for service name « {} »".format(r.text, srv)) - except: + except Exception: self.send("Unable to post to Icinga2") - def recheck_service (self, srv): + def recheck_service(self, srv): headers = { - 'Accept': 'application/json', - 'X-HTTP-Method-Override': 'POST' - } + "Accept": "application/json", + "X-HTTP-Method-Override": "POST", + } params = { - "type" : "Service", - "filter" : ('service.__name=="{}"'.format(srv) if srv != None else "service.state!=ServiceOK"), - } + "type": "Service", + "filter": ( + 'service.__name=="{}"'.format(srv) + if srv is not None + else "service.state!=ServiceOK" + ), + } try: - r = requests.post(self.build_request_url ("/v1/actions/reschedule-check", params=params), - headers=headers, - auth=(settings["icinga2user"], settings["icinga2pass"]), - verify=settings["icinga2ca"]) + r = requests.post( + self.build_request_url( + "/v1/actions/reschedule-check", params=params + ), + headers=headers, + auth=(settings["icinga2user"], settings["icinga2pass"]), + verify=settings["icinga2ca"], + ) if r.status_code == 200: - for a in r.json()['results']: + for a in r.json()["results"]: if a["code"] == 200.0: - self.send (a["status"]) - if srv != None and not r.json()['results']: + self.send(a["status"]) + if srv is not None and not r.json()["results"]: self.send("No result for service name « {} »".format(srv)) else: self.send("{} for service name « {} »".format(r.text, srv)) - except: + except Exception: self.send("Unable to post to Icinga2") + class IcingaBot(Icinga2ServiceManager, irc.bot.SingleServerIRCBot): - args= '' - muted= False - nick_suffix= '' + args = "" + muted = False + nick_suffix = "" def __init__(self, channel, nickname, server, port=6667): - irc.bot.SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname, reconnection_interval=300) - self.nick= nickname + irc.bot.SingleServerIRCBot.__init__( + self, + [(server, port)], + nickname, + nickname, + reconnection_interval=300, + ) + self.nick = nickname self.channel = channel self.connection.execute_every(30, self.refresh_notifications) - self.refresh_notifications () + self.refresh_notifications() - def suffix_nick (self, suffix): - self.nick_suffix= suffix + def suffix_nick(self, suffix): + self.nick_suffix = suffix if self.connection.is_connected(): - self.connection.nick ('{}{}'.format(self.nick, suffix)) + self.connection.nick("{}{}".format(self.nick, suffix)) - def unmute (self): - self.muted= False + def unmute(self): + self.muted = False if self.notifications: - self.suffix_nick ('[{}]'.format(len(self.notifications))) + self.suffix_nick("[{}]".format(len(self.notifications))) else: - self.suffix_nick ('') + self.suffix_nick("") - def send (self, msg): + def send(self, msg): if not self.muted and self.connection.is_connected(): - for line in msg.split('\n'): + for line in msg.split("\n"): self.connection.privmsg(self.channel, line) - def refresh_notifications (self): + def refresh_notifications(self): lost, news = self.fetch_notifications() - if lost == False and news == False: + if lost is False and news is False: return if self.notifications: - self.suffix_nick ('[{}]'.format(len(self.notifications))) + self.suffix_nick("[{}]".format(len(self.notifications))) else: - self.suffix_nick ('') + self.suffix_nick("") for srv in lost: - if srv != None: - self.send("{} is OK".format(srv['name'])) + if srv is not None: + self.send("{} is OK".format(srv["name"])) for srv in news: try: - self.send("{}: => {}".format(srv['name'], srv['attrs']['last_check_result']['output'])) - except: - self.send("{}: => No check result.".format(srv['name'])) + self.send( + "{}: => {}".format( + srv["name"], + srv["attrs"]["last_check_result"]["output"], + ) + ) + except Exception: + self.send("{}: => No check result.".format(srv["name"])) def on_nicknameinuse(self, c, e): - c.nick( + "_") + c.nick(+"_") def on_welcome(self, c, e): c.join(self.channel) @@ -232,90 +283,106 @@ class IcingaBot(Icinga2ServiceManager, irc.bot.SingleServerIRCBot): self.do_command(e, e.arguments[0]) def on_pubmsg(self, c, e): - if e.arguments[0].startswith (settings['irccmd']): + if e.arguments[0].startswith(settings["irccmd"]): self.do_command(e, e.arguments[0][1:]) return a = e.arguments[0].split(":", 1) - if len(a) > 1 and a[0].lower() == self.connection.get_nickname().lower(): + if ( + len(a) > 1 + and a[0].lower() == self.connection.get_nickname().lower() + ): self.do_command(e, a[1].strip()) return - def do_cmd (self, s): - self.args= None - l = s.split(' ', 1) - cmd= l[0] - if len(l)>1: - self.args= l[1].strip() + def do_cmd(self, s): + self.args = None + tokens = s.split(" ", 1) + cmd = tokens[0] + if len(tokens) > 1: + self.args = tokens[1].strip() - for k,v in commands.items(): - if cmd == k and hasattr(self, "do_"+cmd): - return getattr(self, "do_"+cmd) + for k, v in commands.items(): + if cmd == k and hasattr(self, "do_" + cmd): + return getattr(self, "do_" + cmd) if "synonyms" in v: for s in v["synonyms"]: - if re.match(s, cmd) and hasattr(self, "do_"+k): - return getattr(self, "do_"+k) + if re.match(s, cmd) and hasattr(self, "do_" + k): + return getattr(self, "do_" + k) return False - def do_help (self, c, e): - for k,v in commands.items(): + def do_help(self, c, e): + for k, v in commands.items(): self.send("{} -- {}".format(k, v["help"])) if "synonyms" in v: self.send(" synonyms: {}".format(", ".join(v["synonyms"]))) - def do_die (self, c, e): - print('Ok master, I die. Aaargh...') - self.die () + def do_die(self, c, e): + print("Ok master, I die. Aaargh...") + self.die() - def do_leave (self, c, e): + def do_leave(self, c, e): self.disconnect() - def do_list (self, c, e): + def do_list(self, c, e): if self.notifications: for srv in self.notifications: try: - self.send("{}: => {}".format(srv['name'], srv['attrs']['last_check_result']['output'])) - except: - self.send("{}: => No check result.".format(srv['name'])) + self.send( + "{}: => {}".format( + srv["name"], + srv["attrs"]["last_check_result"]["output"], + ) + ) + except Exception: + self.send("{}: => No check result.".format(srv["name"])) else: self.send("Nothing particularly exciting.") - def do_ack (self, c, e): - if self.args == None: - self.send(e.source.nick + ": usage: !ack [: comment]") + def do_ack(self, c, e): + if self.args is None: + self.send( + e.source.nick + ": usage: !ack [: comment]" + ) return - l = self.args.split(':', 1) - srv, comment= l[0].strip(), '' - if len(l)>1: - comment= l[1].strip() - if srv == 'all': - srv= None + tokens = self.args.split(":", 1) + srv, comment = tokens[0].strip(), "" + if len(tokens) > 1: + comment = tokens[1].strip() + if srv == "all": + srv = None self.ack_service(srv, comment, e.source.nick) - def do_recheck (self, c, e): - if self.args == 'all': - self.args == None + def do_recheck(self, c, e): + if self.args == "all": + self.args = None self.recheck_service(self.args) self.connection.execute_delayed(1, self.refresh_notifications) - def do_mute (self, c, e): - self.muted= True - self.suffix_nick ('[zZz]') + def do_mute(self, c, e): + self.muted = True + self.suffix_nick("[zZz]") self.connection.execute_delayed(3600, self.refresh_notifications) - def do_command(self, e, cmd): nick = e.source.nick c = self.connection - if self.do_cmd (cmd): - self.unmute () - (self.do_cmd (cmd)) (c, e) + if self.do_cmd(cmd): + self.unmute() + (self.do_cmd(cmd))(c, e) else: self.send(nick + ": Not understood: " + cmd) + def main(): - bot = IcingaBot(settings['ircchan'], settings['ircnick'], settings['ircsrv'], settings['ircport']) + bot = IcingaBot( + settings["ircchan"], + settings["ircnick"], + settings["ircsrv"], + settings["ircport"], + ) bot.start() + if __name__ == "__main__": main()