From ab2fe9c87f01de6efb6d85b7cc4b31f05815a3ce Mon Sep 17 00:00:00 2001 From: Shauna Date: Fri, 16 May 2014 17:18:45 -0400 Subject: [PATCH] refactored bot.py into newbot.py, added some tests --- newbot.py | 231 +++++++++++++++++++++++++++++++++++++++++++++++++ test_newbot.py | 115 ++++++++++++++++++++++++ test_nicks.csv | 2 + 3 files changed, 348 insertions(+) create mode 100644 newbot.py create mode 100644 test_newbot.py create mode 100644 test_nicks.csv diff --git a/newbot.py b/newbot.py new file mode 100644 index 0000000..e02f0e4 --- /dev/null +++ b/newbot.py @@ -0,0 +1,231 @@ +# Welcome to WelcomeBot. Find source, documentation, etc here: https://github.com/shaunagm/WelcomeBot/ Licensed https://creativecommons.org/licenses/by-sa/2.0/ + +# Import some necessary libraries. +import socket +import sys +import time +import csv +import Queue +import random +import re +from threading import Thread + +# Some basic variables used to configure the bot. +server = "irc.freenode.net" +channel = "#openhatch-bots" +botnick = "WelcomeBot2" +channel_greeters = ['shauna'] #'paulproteus', 'marktraceur'] +hello_list = [r'hello', r'hi', r'hey', r'yo', r'sup'] +help_list = [r'help', r'info', r'faq', r'explain yourself'] + + +######################### +### Class Definitions ### +######################### + +# Defines a bot +class Bot(object): + + def __init__(self, nick_source='nicks.csv', wait_time=60): + self.nick_source = nick_source + self.wait_time = wait_time + self.known_nicks = [] + with open(self.nick_source, 'rb') as csv_file: + csv_file_data = csv.reader(csv_file, delimiter=',', quotechar='|') + for row in csv_file_data: + self.known_nicks.append(row) + self.newcomers = [] + self.hello_regex = re.compile(get_regex(hello_list), re.I) # Regexed version of hello list + self.help_regex = re.compile(get_regex(help_list), re.I) # Regexed version of help list + + # Adds the current newcomer's nick to nicks.csv and known_nicks. + def add_known_nick(self,new_known_nick): + new_known_nick = new_known_nick.replace("_", "") + self.known_nicks.append([new_known_nick]) + with open(self.nick_source, 'a') as csvfile: + nickwriter = csv.writer(csvfile, delimiter=',', quotechar='|', + quoting=csv.QUOTE_MINIMAL) + nickwriter.writerow([new_known_nick]) + +# Defines a newcomer object +class NewComer(object): + + def __init__(self, nick, bot): + self.nick = nick + self.born = time.time() + bot.newcomers.append(self) + + def around_for(self): + return time.time() - self.born + + +######################### +### Startup Functions ### +######################### + +# Creates a socket that will be used to send and receive messages, +# then connects the socket to an IRC server and joins the channel. +def irc_start(): + global ircsock + ircsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ircsock.connect((server, 6667)) # Here we connect to server using port 6667. + ircsock.send("USER {0} {0} {0} :This is http://openhatch.org/'s greeter bot" + ".\n".format(botnick)) # bot authentication + ircsock.send("NICK {}\n".format(botnick)) # Assign the nick to the bot. + ircsock.send("JOIN {} \n".format(channel)) # Joins channel + +# Creates a separate thread for incoming messages (to combat concurrency issues) +def thread_start(): + global q + q = Queue.Queue() # Creates a Queue that will hold the incoming messages. + t = Thread(target=msg_handler) # Creates a separate thread running msg_hander function (below) + t.daemon = True + t.start() + +# Reads the messages from the server and adds them to the Queue and prints +# them to the console. This function will be run in a thread, see below. +def msg_handler(): + while True: + new_msg = ircsock.recv(2048) # receive data from the server + new_msg = new_msg.strip('\n\r') # removing any unnecessary linebreaks + q.put(new_msg) # put in queue for main loop to read + print(new_msg) #### Potentially make this a log instead? + +# Called by bot on startup. Builds a regex that matches one of the options + (space) botnick. +def get_regex(options): + pattern = "(" + for s in options: + pattern += s + pattern += "|" + pattern = pattern[:-1] + pattern += ").({})".format(botnick) + return pattern + + +######################### +### General Functions ### +######################### + +# This welcomes the "person" passed to it. +def welcome_nick(newcomer): + ircsock.send("PRIVMSG {0} :Welcome {1}! The channel is pretty quiet " + "right now, so I though I'd say hello, and ping some people " + "(like {2}) that you're here. If no one responds for a " + "while, try emailing us at hello@openhatch.org or just try " + "coming back later. FYI, you're now on my list of known " + "nicknames, so I won't bother you again." + "\n".format(channel, newcomer, greeter_string("and"))) + +# Checks and manages the status of newcomers. +def process_newcomers(bot, newcomerlist=[], welcome=1): + for person in newcomerlist: + if welcome == 1: + welcome_nick(person.nick) + bot.add_known_nick(person.nick) + bot.newcomers.remove(person) + +# Checks for messages. +def check_messages(): + ircmsg = q.get() # get the next msg in the queue + actor = ircmsg.split(":")[1].split("!")[0] # and get the nick of the msg sender + return ircmsg, actor # Assumes that the above parsing works. :/ + +# This function parses messages and responds to them appropriately. +def message_response(bot, ircmsg, actor): + + # if someone other than a newcomer speaks into the channel + if ircmsg.find("PRIVMSG " + channel) != -1 and actor not in [i.nick for i in bot.newcomers]: + process_newcomers(bot.newcomers,welcome=0) # Process/check newcomers without welcoming them + + # if someone (other than the bot) joins the channel + if ircmsg.find("JOIN " + channel) != -1 and actor != botnick: + if actor.replace("_", "") not in bot.known_nicks and (i.nick for i in bot.newcomers): # And they're new + NewComer(actor, bot) + + # If someone parts or quits the #channel... + if ircmsg.find("PART " + channel) != -1 or ircmsg.find("QUIT") != -1: + for i in bot.newcomers: # and that person is on the newlist + if actor == i.nick: + bot.newcomers.remove(i) # remove them from the list + + # If someone talks to (or refers to) the bot. + if botnick.lower() and "PRIVMSG".lower() in ircmsg.lower(): + if bot.hello_regex.search(ircmsg): + bot_hello(random.choice(hello_list), actor) + if bot.help_regex.search(ircmsg): + bot_help() + + # If someone tries to change the wait time... + if ircmsg.find(botnick + " --wait-time ") != -1: + bot.wait_time = wait_time_change(actor, ircmsg) # call this to check and change it + + # If the server pings us then we've got to respond! + if ircmsg.find("PING :") != -1: + pong() + + +############################################################# +### Bot Response Functions (called by message_response()) ### +############################################################# + +# This function responds to a user that inputs "Hello Mybot". +def bot_hello(greeting, actor): + ircsock.send("PRIVMSG {0} :{1} {2}\n".format(channel, greeting, actor)) + +# This function explains what the bot is when queried. +def bot_help(): + ircsock.send("PRIVMSG {} :I'm a bot! I'm from here . You can change my behavior by " + "submitting a pull request or by talking to shauna" + ".\n".format(channel)) + +# Returns a grammatically correct string of the channel_greeters. +def greeter_string(conjunction): + greeters = "" + if len(channel_greeters) > 2: + for name in channel_greeters[:-1]: + greeters += "{}, ".format(name) + greeters += "{0} {1}".format(conjunction, channel_greeters[-1]) + elif len(channel_greeters) == 2: + greeters = "{0} {1} {2}".format(channel_greeters[0], conjunction, + channel_greeters[1]) + else: + greeters = channel_greeters[0] + return greeters + +# This function is used to change the wait time from the channel. +# It confirms that the attempt is allowed and then returns the requested value. +# If the attempt is not allowed, a message is sent to help +def wait_time_change(actor, ircmsg): + for admin in channel_greeters: + if actor == admin: + finder = re.search(r'\d\d*', re.search(r'--wait-time \d\d*', ircmsg) + .group()) + ircsock.send("PRIVMSG {0} :{1} the wait time is changing to {2} " + "seconds.\n".format(channel, actor, finder.group())) + return int(finder.group()) + ircsock.send("PRIVMSG {0} :{1} you are not authorized to make that " + "change. Please contact one of the channel greeters, like {2}, for " + "assistance.\n".format(channel, actor, greeter_string("or"))) + +# Responds to server Pings. +def pong(): + ircsock.send("PONG :pingis\n") + + +########################## +### The main function. ### +########################## + +def main(): + irc_start() + thread_start() + WelcomeBot = Bot() + while 1: # Loop forever + process_newcomers(WelcomeBot, [i for i in WelcomeBot.newcomers if i.around_for() > WelcomeBot.wait_time]) + if q.empty() == 0: # If the queue is not empty... + message_response(WelcomeBot, *check_messages()) # Checks for messages, gets a message text and actor, and responds + +if __name__ == "__main__": # This line tells the interpreter to only execute main() if the program is being run, not imported. + sys.exit(main()) + diff --git a/test_newbot.py b/test_newbot.py new file mode 100644 index 0000000..80701c4 --- /dev/null +++ b/test_newbot.py @@ -0,0 +1,115 @@ +# Yay tests! + +import unittest +import newbot +import time + +class TestBotClass(unittest.TestCase): + + def setUp(self): + self.bot = newbot.Bot() + + def test_csv_source(self): + self.assertEqual(self.bot.nick_source, 'nicks.csv') + + def test_known_nicks_setup(self): + bot = newbot.Bot('test_nicks.csv') + self.assertEqual(bot.known_nicks, [['Alice'], ['Bob']]) + + def test_wait_time(self): + self.assertEqual(self.bot.wait_time, 60) + + def test_custom_wait_time(self): + bot = newbot.Bot(wait_time=30) + self.assertEqual(bot.wait_time, 30) + + def test_newcomers_setup(self): + self.assertEqual(self.bot.newcomers, []) + + def test_add_nick_to_list(self): + self.bot.known_nicks = [['Fluffy'], ['Spot']] + self.bot.add_known_nick('Roger') + self.assertEqual(self.bot.known_nicks,[['Fluffy'], ['Spot'], ['Roger']]) + + def test_add_nick_underscore_removal(self): + self.bot.known_nicks = [['Fluffy'], ['Spot']] + self.bot.add_known_nick('Roger__') + self.assertEqual(self.bot.known_nicks,[['Fluffy'], ['Spot'], ['Roger']]) + + def test_add_nick_to_csv(self): + bot = newbot.Bot('test_nicks.csv') + bot.add_known_nick('Roger__') + self.assertEqual(bot.known_nicks, [['Alice'], ['Bob'], ['Roger']]) + + def tearDown(self): + with open('test_nicks.csv', 'w') as csv_file: + csv_file.write('Alice\nBob') + +class TestNewComerClass(unittest.TestCase): + + def setUp(self): + self.bot = newbot.Bot('test_nicks.csv') + self.NewComer = newbot.NewComer('Nancy', self.bot) + + def test_newcomer_init_nick(self): + self.assertEqual(self.NewComer.nick, 'Nancy') + + def test_newcomer_init_born(self): + newComer = newbot.NewComer('Baby', newbot.Bot()) + time.sleep(0.01) + self.assertAlmostEqual(newComer.born, time.time() - .01, places=2) + + def test_add_newcomer_to_bot(self): + pass + + def test_newcomer_around_for(self): + newComer = newbot.NewComer('Shauna', newbot.Bot()) + time.sleep(0.01) + self.assertAlmostEqual(newComer.around_for(), .01, places=2) + +# +# Not sure how to test irc_start, thread_start or msg_handler yet. +# + +class TestProcessNewcomers(unittest.TestCase): + + def setUp(self): + self.bot = newbot.Bot('test_nicks.csv', wait_time=.1) + newbot.NewComer('Harry', self.bot) + newbot.NewComer('Hermione', self.bot) + time.sleep(.15) + newbot.NewComer('Ron', self.bot) + + def test_check_new_newcomers(self): + newbot.process_newcomers(self.bot, [i for i in self.bot.newcomers if i.around_for() > self.bot.wait_time], welcome=0) + self.assertEqual(len(self.bot.newcomers), 1) + + def test_check_new_known_nicks(self): + newbot.process_newcomers(self.bot, [i for i in self.bot.newcomers if i.around_for() > self.bot.wait_time], welcome=0) + self.assertEqual(self.bot.known_nicks,[['Alice'],['Bob'],['Harry'],['Hermione']]) + + ## Should be a test of the welcome=1/welcome=0 functionality here, but not sure how to do that yet since welcome() calls ircsock. + + def tearDown(self): + with open('test_nicks.csv', 'w') as csv_file: + csv_file.write('Alice\nBob') + +# +# Not sure how to test check_messages. +# + +class TestMessageResponse(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + +class TestProcessNewcomers(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass diff --git a/test_nicks.csv b/test_nicks.csv new file mode 100644 index 0000000..efbdabc --- /dev/null +++ b/test_nicks.csv @@ -0,0 +1,2 @@ +Alice +Bob \ No newline at end of file