reformattage à l aide de black et passage de flake8

This commit is contained in:
François Poulain 2020-04-26 11:58:57 +02:00
parent d218d69b97
commit 9fdd726de0
1 changed files with 236 additions and 169 deletions

View File

@ -1,5 +1,5 @@
#! /usr/bin/env python3
#
#
# Simple IRC Bot statusing icinga2 via its API.
#
# François Poulain <fpoulain@metrodore.fr>
@ -10,6 +10,17 @@
# Joel Rosdahl <joel@rosdahl.net>
# https://github.com/jaraco/irc/raw/master/scripts/testbot.py
import configparser
import json
import os.path
import re
import sys
import requests
import irc.bot
import irc.strings
"""A simple IRC Bot statusing icinga2.
It is based on TestBot example bot that uses the SingleServerIRCBot class from
@ -23,41 +34,33 @@ Requirements: python3-irc python3-requests
The known commands are:
"""
commands = {
"ack" : {
"help": "Acknowledge a given service.",
},
"recheck" : {
"help": "Recheck a given service or all services.",
"synonyms" : [r"refresh"],
},
"list" : {
"help": "List all KO services.",
"synonyms" : [r"lsit", r"lits"],
},
"leave" : {
"help": "Disconnect the bot. The bot will try to reconnect after 300 seconds.",
},
"mute" : {
"help": "Mute the bot (no more status report). The bot will unmute after 1 hour or after receiving any command.",
"synonyms" : [r"fuck", r"chut", r"couché", r"sieste", r"t(a|o)\s*g(ueu|o)le"],
},
"die" : {
"help": "Let the bot cease to exist.",
},
"help" : {
"help": "Print help.",
},
}
import os.path
import irc.bot
import irc.strings
import re
import requests, json
import configparser
"ack": {"help": "Acknowledge a given service."},
"recheck": {
"help": "Recheck a given service or all services.",
"synonyms": [r"refresh"],
},
"list": {"help": "List all KO services.", "synonyms": [r"lsit", r"lits"]},
"leave": {
"help": "Disconnect the bot."
" The bot will try to reconnect after 300 seconds.",
},
"mute": {
"help": "Mute the bot (no more status report)."
" The bot will unmute after 1 hour or after receiving any command.",
"synonyms": [
r"fuck",
r"chut",
r"couché",
r"sieste",
r"t(a|o)\s*g(ueu|o)le",
],
},
"die": {"help": "Let the bot cease to exist."},
"help": {"help": "Print help."},
}
# Load configuration.
configurationFilename="/etc/icingabot/icingabot.conf"
configurationFilename = "/etc/icingabot/icingabot.conf"
if os.path.isfile(configurationFilename):
config = configparser.RawConfigParser()
config.read(configurationFilename)
@ -68,162 +71,210 @@ if os.path.isfile(configurationFilename):
"ircport": config.getint("irc", "irc.port"),
"ircnick": config.get("irc", "irc.nick"),
"irccmd": config.get("irc", "irc.cmd_prefix"),
# see /etc/icinga2/conf.d/api-users.conf
"icinga2user" : config.get("icinga2", "icinga2.user"),
"icinga2pass" : config.get("icinga2", "icinga2.password"),
"icinga2ca" : config.get("icinga2", "icinga2.ca"),
"icinga2fqdn" : config.get("icinga2", "icinga2.fqdn"),
"icinga2port" : config.getint("icinga2", "icinga2.port")
"icinga2user": config.get("icinga2", "icinga2.user"),
"icinga2pass": config.get("icinga2", "icinga2.password"),
"icinga2ca": config.get("icinga2", "icinga2.ca"),
"icinga2fqdn": config.get("icinga2", "icinga2.fqdn"),
"icinga2port": config.getint("icinga2", "icinga2.port"),
}
else:
print ("Missing configuration file [/etc/icingabot/icingabot.conf].")
print("Missing configuration file [/etc/icingabot/icingabot.conf].")
sys.exit()
class Icinga2ServiceManager:
notifications= []
notifications = []
def build_request_url (self, uri, params={}):
def build_request_url(self, uri, params={}):
# Since icinga2 wants « URL-encoded strings » but requests
# seems to makes « application/x-www-form-urlencoded »,
# so we munge params and shortcut urlencode.
# See https://www.icinga.com/docs/icinga2/latest/doc/12-icinga2-api/#parameters
url = 'https://{}:{}{}'.format(settings["icinga2fqdn"], settings["icinga2port"], uri)
return requests.Request('GET', url, params=params).prepare().url.replace('+', '%20')
# See
# https://www.icinga.com/docs/icinga2/latest/doc/12-icinga2-api/#parameters
url = "https://{}:{}{}".format(
settings["icinga2fqdn"], settings["icinga2port"], uri
)
return (
requests.Request("GET", url, params=params)
.prepare()
.url.replace("+", "%20")
)
def fetch_notifications (self):
def fetch_notifications(self):
headers = {
'Accept': 'application/json',
'X-HTTP-Method-Override': 'GET'
}
"Accept": "application/json",
"X-HTTP-Method-Override": "GET",
}
data = {
"attrs": [ "last_check_result" ],
"filter": "service.state!=ServiceOK",
}
"attrs": ["last_check_result"],
"filter": "service.state!=ServiceOK",
}
try:
r = requests.post(self.build_request_url ("/v1/objects/services"),
headers=headers,
auth=(settings["icinga2user"], settings["icinga2pass"]),
data=json.dumps(data),
verify=settings["icinga2ca"])
if (r.status_code == 200):
new_notifications = [n for n in r.json()['results'] if n != None]
news= [n for n in new_notifications if n['name'] not in [nn['name'] for nn in self.notifications]]
lost= [n for n in self.notifications if n['name'] not in [nn['name'] for nn in new_notifications ]]
r = requests.post(
self.build_request_url("/v1/objects/services"),
headers=headers,
auth=(settings["icinga2user"], settings["icinga2pass"]),
data=json.dumps(data),
verify=settings["icinga2ca"],
)
if r.status_code == 200:
new_notifications = [
n for n in r.json()["results"] if n is not None
]
news = [
n
for n in new_notifications
if n["name"]
not in [nn["name"] for nn in self.notifications]
]
lost = [
n
for n in self.notifications
if n["name"]
not in [nn["name"] for nn in new_notifications]
]
self.notifications = new_notifications
return (lost, news)
except:
except Exception:
self.send("Unable to fetch from Icinga2")
return (False,False)
return (False, False)
def ack_service (self, srv, comment, nick):
def ack_service(self, srv, comment, nick):
# weird but needed:
if comment == '':
comment = ' '
if comment == "":
comment = " "
# /weird
headers = {
'Accept': 'application/json',
'X-HTTP-Method-Override': 'POST'
}
"Accept": "application/json",
"X-HTTP-Method-Override": "POST",
}
data = {
"author": nick,
"comment": comment,
}
"author": nick,
"comment": comment,
}
params = {
"type" : "Service",
"filter" : ('service.__name=="{}"'.format(srv) if srv != None else "service.state!=ServiceOK"),
}
"type": "Service",
"filter": (
'service.__name=="{}"'.format(srv)
if srv is not None
else "service.state!=ServiceOK"
),
}
try:
r = requests.post(self.build_request_url ("/v1/actions/acknowledge-problem", params=params),
headers=headers,
auth=(settings["icinga2user"], settings["icinga2pass"]),
data=json.dumps(data),
verify=settings["icinga2ca"])
r = requests.post(
self.build_request_url(
"/v1/actions/acknowledge-problem", params=params
),
headers=headers,
auth=(settings["icinga2user"], settings["icinga2pass"]),
data=json.dumps(data),
verify=settings["icinga2ca"],
)
if r.status_code == 200:
for a in r.json()['results']:
for a in r.json()["results"]:
if a["code"] == 200.0:
self.send (a["status"])
if srv != None and not r.json()['results']:
self.send(a["status"])
if srv is not None and not r.json()["results"]:
self.send("No result for service name « {} »".format(srv))
else:
self.send("{} for service name « {} »".format(r.text, srv))
except:
except Exception:
self.send("Unable to post to Icinga2")
def recheck_service (self, srv):
def recheck_service(self, srv):
headers = {
'Accept': 'application/json',
'X-HTTP-Method-Override': 'POST'
}
"Accept": "application/json",
"X-HTTP-Method-Override": "POST",
}
params = {
"type" : "Service",
"filter" : ('service.__name=="{}"'.format(srv) if srv != None else "service.state!=ServiceOK"),
}
"type": "Service",
"filter": (
'service.__name=="{}"'.format(srv)
if srv is not None
else "service.state!=ServiceOK"
),
}
try:
r = requests.post(self.build_request_url ("/v1/actions/reschedule-check", params=params),
headers=headers,
auth=(settings["icinga2user"], settings["icinga2pass"]),
verify=settings["icinga2ca"])
r = requests.post(
self.build_request_url(
"/v1/actions/reschedule-check", params=params
),
headers=headers,
auth=(settings["icinga2user"], settings["icinga2pass"]),
verify=settings["icinga2ca"],
)
if r.status_code == 200:
for a in r.json()['results']:
for a in r.json()["results"]:
if a["code"] == 200.0:
self.send (a["status"])
if srv != None and not r.json()['results']:
self.send(a["status"])
if srv is not None and not r.json()["results"]:
self.send("No result for service name « {} »".format(srv))
else:
self.send("{} for service name « {} »".format(r.text, srv))
except:
except Exception:
self.send("Unable to post to Icinga2")
class IcingaBot(Icinga2ServiceManager, irc.bot.SingleServerIRCBot):
args= ''
muted= False
nick_suffix= ''
args = ""
muted = False
nick_suffix = ""
def __init__(self, channel, nickname, server, port=6667):
irc.bot.SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname, reconnection_interval=300)
self.nick= nickname
irc.bot.SingleServerIRCBot.__init__(
self,
[(server, port)],
nickname,
nickname,
reconnection_interval=300,
)
self.nick = nickname
self.channel = channel
self.connection.execute_every(30, self.refresh_notifications)
self.refresh_notifications ()
self.refresh_notifications()
def suffix_nick (self, suffix):
self.nick_suffix= suffix
def suffix_nick(self, suffix):
self.nick_suffix = suffix
if self.connection.is_connected():
self.connection.nick ('{}{}'.format(self.nick, suffix))
self.connection.nick("{}{}".format(self.nick, suffix))
def unmute (self):
self.muted= False
def unmute(self):
self.muted = False
if self.notifications:
self.suffix_nick ('[{}]'.format(len(self.notifications)))
self.suffix_nick("[{}]".format(len(self.notifications)))
else:
self.suffix_nick ('')
self.suffix_nick("")
def send (self, msg):
def send(self, msg):
if not self.muted and self.connection.is_connected():
for line in msg.split('\n'):
for line in msg.split("\n"):
self.connection.privmsg(self.channel, line)
def refresh_notifications (self):
def refresh_notifications(self):
lost, news = self.fetch_notifications()
if lost == False and news == False:
if lost is False and news is False:
return
if self.notifications:
self.suffix_nick ('[{}]'.format(len(self.notifications)))
self.suffix_nick("[{}]".format(len(self.notifications)))
else:
self.suffix_nick ('')
self.suffix_nick("")
for srv in lost:
if srv != None:
self.send("{} is OK".format(srv['name']))
if srv is not None:
self.send("{} is OK".format(srv["name"]))
for srv in news:
try:
self.send("{}: => {}".format(srv['name'], srv['attrs']['last_check_result']['output']))
except:
self.send("{}: => No check result.".format(srv['name']))
self.send(
"{}: => {}".format(
srv["name"],
srv["attrs"]["last_check_result"]["output"],
)
)
except Exception:
self.send("{}: => No check result.".format(srv["name"]))
def on_nicknameinuse(self, c, e):
c.nick( + "_")
c.nick(+"_")
def on_welcome(self, c, e):
c.join(self.channel)
@ -232,90 +283,106 @@ class IcingaBot(Icinga2ServiceManager, irc.bot.SingleServerIRCBot):
self.do_command(e, e.arguments[0])
def on_pubmsg(self, c, e):
if e.arguments[0].startswith (settings['irccmd']):
if e.arguments[0].startswith(settings["irccmd"]):
self.do_command(e, e.arguments[0][1:])
return
a = e.arguments[0].split(":", 1)
if len(a) > 1 and a[0].lower() == self.connection.get_nickname().lower():
if (
len(a) > 1
and a[0].lower() == self.connection.get_nickname().lower()
):
self.do_command(e, a[1].strip())
return
def do_cmd (self, s):
self.args= None
l = s.split(' ', 1)
cmd= l[0]
if len(l)>1:
self.args= l[1].strip()
def do_cmd(self, s):
self.args = None
tokens = s.split(" ", 1)
cmd = tokens[0]
if len(tokens) > 1:
self.args = tokens[1].strip()
for k,v in commands.items():
if cmd == k and hasattr(self, "do_"+cmd):
return getattr(self, "do_"+cmd)
for k, v in commands.items():
if cmd == k and hasattr(self, "do_" + cmd):
return getattr(self, "do_" + cmd)
if "synonyms" in v:
for s in v["synonyms"]:
if re.match(s, cmd) and hasattr(self, "do_"+k):
return getattr(self, "do_"+k)
if re.match(s, cmd) and hasattr(self, "do_" + k):
return getattr(self, "do_" + k)
return False
def do_help (self, c, e):
for k,v in commands.items():
def do_help(self, c, e):
for k, v in commands.items():
self.send("{} -- {}".format(k, v["help"]))
if "synonyms" in v:
self.send(" synonyms: {}".format(", ".join(v["synonyms"])))
def do_die (self, c, e):
print('Ok master, I die. Aaargh...')
self.die ()
def do_die(self, c, e):
print("Ok master, I die. Aaargh...")
self.die()
def do_leave (self, c, e):
def do_leave(self, c, e):
self.disconnect()
def do_list (self, c, e):
def do_list(self, c, e):
if self.notifications:
for srv in self.notifications:
try:
self.send("{}: => {}".format(srv['name'], srv['attrs']['last_check_result']['output']))
except:
self.send("{}: => No check result.".format(srv['name']))
self.send(
"{}: => {}".format(
srv["name"],
srv["attrs"]["last_check_result"]["output"],
)
)
except Exception:
self.send("{}: => No check result.".format(srv["name"]))
else:
self.send("Nothing particularly exciting.")
def do_ack (self, c, e):
if self.args == None:
self.send(e.source.nick + ": usage: !ack <service|all> [: comment]")
def do_ack(self, c, e):
if self.args is None:
self.send(
e.source.nick + ": usage: !ack <service|all> [: comment]"
)
return
l = self.args.split(':', 1)
srv, comment= l[0].strip(), ''
if len(l)>1:
comment= l[1].strip()
if srv == 'all':
srv= None
tokens = self.args.split(":", 1)
srv, comment = tokens[0].strip(), ""
if len(tokens) > 1:
comment = tokens[1].strip()
if srv == "all":
srv = None
self.ack_service(srv, comment, e.source.nick)
def do_recheck (self, c, e):
if self.args == 'all':
self.args == None
def do_recheck(self, c, e):
if self.args == "all":
self.args = None
self.recheck_service(self.args)
self.connection.execute_delayed(1, self.refresh_notifications)
def do_mute (self, c, e):
self.muted= True
self.suffix_nick ('[zZz]')
def do_mute(self, c, e):
self.muted = True
self.suffix_nick("[zZz]")
self.connection.execute_delayed(3600, self.refresh_notifications)
def do_command(self, e, cmd):
nick = e.source.nick
c = self.connection
if self.do_cmd (cmd):
self.unmute ()
(self.do_cmd (cmd)) (c, e)
if self.do_cmd(cmd):
self.unmute()
(self.do_cmd(cmd))(c, e)
else:
self.send(nick + ": Not understood: " + cmd)
def main():
bot = IcingaBot(settings['ircchan'], settings['ircnick'], settings['ircsrv'], settings['ircport'])
bot = IcingaBot(
settings["ircchan"],
settings["ircnick"],
settings["ircsrv"],
settings["ircport"],
)
bot.start()
if __name__ == "__main__":
main()