Update to Scoopbot

I updated scoopbot to use a database instead of just a list of sites in memory. I have not done anything with Sqlite before, so this was a learning experience for me. Please leave a comment and let me know if you suggest a better way, a more efficient way, or a more Pythonic way, of doing the database stuff.

This change will let it be more robust; for example, previously, if the bot was down and restarted it would ignore anything that was posted in the interim since it rebuilt the list of latest posts when restarted. Now, by checking against the database at a restart, it should have an idea of posts that were added that it missed.

I also used this opportunity to add the ability to have individualized feeds for each user. Also, anybody (who is peered to it on Pestnet) can add feeds in the general channel. This is a privilege, if it gets abused I will change it back to where only I can add feeds.

The new version of scoopbot lives on the Pestnet. If you would like to be added as a peer of scoopbot, please contact me (PeterL on the Pestnet, or leave a comment here).

A change I am thinking of adding is a way to control the bot - with the current version of Blatta, the only way to add peers for the bot is to shut it down and log in to Blatta manually. My idea is to add a table where I can drop in peer, key, and address, and the bot would grab them and add them if they are there.

Here is the code for scoopbot (I will generate a post a vpatch shortly):


#!/usr/bin/python

##############################################################################
## 'Scoopbot', an IRC-RSS feed monitor.                                     ##
##                                                                          ##
## The IRC part of this is based off of 'Watchglass' by asciilifeform,      ##
## see http://www.loper-os.org/?p=3665                                      ##
##                                                                          ##
## (C) 2021, 2022 Peter Lambert ( peterl.xyz )                              ##
##                                                                          ##
## You do not have, nor can you ever acquire the right to use, copy or      ##
## distribute this software ; should you use this software for any purpose, ##
## or copy and distribute it to anyone or in any manner, you are breaking   ##
## the laws of whatever soi-disant jurisdiction, and you promise to         ##
## continue doing so for the indefinite future. In any case, please         ##
## always : read and understand any software ; verify any PGP signatures    ##
## that you use - for any purpose.                                          ##
##############################################################################

import ConfigParser
import Queue
import datetime
import logging
import re
import socket
import sqlite3
import sys
import threading
import time
import urllib2
from datetime import datetime
from xml.etree.ElementTree import XML

##############################################################################
## Config
##############################################################################

# Version. If changing this program, always set this to same # as in MANIFEST

Ver = 744610

cfg = ConfigParser.ConfigParser()

# Single mandatory arg: config file path
if len(sys.argv[1:]) != 1:
    # If no args, print usage and exit:
    print sys.argv[0] + " CONFIG"
    exit(0)

# Read Config
cfg.readfp(open(sys.argv[1]))

# Get log path
logpath = cfg.get("bofh", "log")

# Get IRCism debug toggle
irc_dbg = int(cfg.get("irc", "irc_dbg"))
if irc_dbg == 1:
    log_lvl = logging.DEBUG
else:
    log_lvl = logging.INFO

# Init debug logger:
logging.basicConfig(filename=logpath, filemode='a', level=log_lvl,
                    format='%(asctime)s %(levelname)s %(message)s',
                    datefmt='%d-%b-%y %H:%M:%S')

# Get the remaining knob values:
try:
    # IRCism:
    Buf_Size = int(cfg.get("tcp", "bufsize"))
    Timeout = int(cfg.get("tcp", "timeout"))
    TX_Delay = float(cfg.get("tcp", "t_delay"))
    Servers = [x.strip() for x in cfg.get("irc", "servers").split(',')]
    Port = int(cfg.get("irc", "port"))
    Nick = cfg.get("irc", "nick")
    Pass = cfg.get("irc", "pass")
    Channels = [x.strip() for x in cfg.get("irc", "chans").split(',')]
    Join_Delay = int(cfg.get("irc", "join_t"))
    Discon_TO = int(cfg.get("irc", "disc_t"))

    # Control:
    Prefix = cfg.get("control", "prefix")
    Src_URL = cfg.get("control", "src_url")
    Controlpass = cfg.get("control", "controlpass")

    # RSS Checking:
    CheckTime = int(cfg.get("sites", "checktime"))
    Max_Reported = int(cfg.get("sites", "max_reported"))
    Site_DB = cfg.get("sites", "db")

except Exception as e:
    print "Invalid config: ", e
    exit(1)

##############################################################################
## Feed Handling stuff
##############################################################################

# Used to hold list of sites to send to IRC
NewPosts = Queue.Queue()

# Take a url to a RSS feed and return the title as a string and an array of
# dicts for the items
def parse(my_url):
    # Load data from the given url
    resp = urllib2.urlopen(my_url).read()

    # Find where the xml tree starts and parse the data as an xml structure
    channel = XML(resp[resp.find('<'):])
    site_title = channel.find('./channel/title').text.strip()

    # Convert from XML to a python structure
    items = []
    for k in channel.findall('.//item'):
        items.append({'title': k.find('title').text.strip(),
                      'link': k.find('link').text})

    return site_title, items

def check_rss():
    global NewPosts

    # Open a connection to the database
    DB_Conn = sqlite3.connect(Site_DB)
    cur = DB_Conn.cursor()

    # Continually search for new posts
    while 1:
        cur.execute('SELECT site FROM sites')
        for row in cur.fetchall():
            blog = row[0]
            cur.execute('SELECT user FROM listening WHERE site=?', (blog, ))
            listeners = cur.fetchall()
            try:
                blog_title, posts = parse(blog)
                count = 0
                for item in posts:
                    cur.execute('SELECT 1 FROM pages WHERE site=? AND link=?',
                            (blog, item['link']))
                    if cur.fetchall():
                        break
                    else:
                        count += 1
                        for c in listeners:
                            NewPosts.put([c[0], "%s: [ %s ][ %s ]" % (
                                    blog_title, item['link'], item['title'])])
                        cur.execute('INSERT INTO pages VALUES (?, ?, ?)',
                                (blog, item['title'], item['link']))
                        DB_Conn.commit()

                        if count >= Max_Reported:
                            break

            except Exception as e:
                logging.warning('problem with check of %s : %s' % (blog, e))

        time.sleep(CheckTime * 60)
    DB_Conn.close()

##############################################################################
## IRC Bot
##############################################################################

# Used to compute 'uptime'
time_last_conn = datetime.now()

# Used to monitor disconnection timeout
time_last_recv = datetime.now()

# Socket will be here:
sock = None

# Initially we are not connected to anything
connected = False

# TX Lock for async. operations
irc_tx_lock = threading.Lock()

def init_socket():
    global sock
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # Disable Nagle's algorithm for transmit operations
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

    connected = False

def deinit_socket():
    global connected
    global sock
    sock.close()
    connected = False

# Connect to given host:port; return whether connected
def connect(host, port):
    logging.info("Connecting to %s:%s" % (host, port))
    sock.settimeout(Timeout)
    try:
        sock.connect((host, port))
    except (socket.timeout, socket.error) as e:
        logging.warning(e)
        return False
    except Exception as e:
        logging.exception(e)
        return False
    else:
        logging.info("Connected.")
        return True

# Attempt connect to each of hosts, in order, on port; return whether connected
def connect_any(hosts, port):
    for host in hosts:
        if connect(host, port):
            return True
    return False

# Transmit IRC message
def send(message):
    global connected
    if not connected:
        logging.warning("Tried to send while disconnected?")
        return False
    time.sleep(TX_Delay)
    logging.debug("> '%s'" % message)
    message = "%s\r\n" % message
    try:
        irc_tx_lock.acquire()
        sock.send(message.encode("utf-8"))
    except (socket.timeout, socket.error) as e:
        logging.warning("Socket could not send! Disconnecting.")
        deinit_socket()
        return False
    except Exception as e:
        logging.exception(e)
        return False
    finally:
        irc_tx_lock.release()

# Speak given message on a selected channel
def speak(channel, message):
    send("PRIVMSG %s :%s" % (channel, message))

# Standard incoming IRC line (excludes fleanode liquishit, etc)
irc_line_re = re.compile("""^:(\S+)\s+PRIVMSG\s+(\S+)\s+\:(.*)""")

# 'Actions'
irc_act_re = re.compile(""".*ACTION\s+(.*)""")

# input looks like ':PeterL PRIVMSG #pest :!s uptime'
# direct message looks like ':PeterL PRIVMSG scoopbot :just checking'

# A line was received from IRC
def received_line(line):
    # Process the traditional pingpong
    if line.startswith("PING"):
        send("PONG " + line.split()[1])
    else:
        logging.debug("< '%s'" % line)
        standard_line = re.search(irc_line_re, line)
        if standard_line:
            # Break this line into the standard segments
            (user, chan, text) = [s.strip() for s in standard_line.groups()]
            # Handle direct messages
            if chan == Nick:
                chan = user
            # Determine whether this line is an 'action' :
            action = False
            act = re.search(irc_act_re, line)
            if act:
                action = True
                text = act.group(1)
                # Remove turd if present:
                if text[-1:] == '\x01':
                    text = text[:-1]
            # This line is edible, process it.
            eat_line(user, chan, text, action)

joined = False

def join_chans():
    global joined
    if joined:
        return
    for chan in Channels:
        logging.info("Joining channel '%s'..." % chan)
        send("JOIN %s\r\n" % chan)
        joined = True

# IRCate until we get disconnected
def irc():
    global connected
    global time_last_conn
    global time_last_recv
    global sock
    global NewPosts

    # Initialize a socket
    init_socket()

    # Connect to one among the specified servers, in given priority :
    while not connected:
        connected = connect_any(Servers, Port)

    # Save time of last successful connect
    time_last_conn = datetime.now()

    # Auth to server
    # If this is a production bot, rather than test, there will be a PW:
    if Pass != "":
        send("PASS %s\r\n" % Pass)
    send("NICK %s\r\n" % Nick)
    send("USER %s %s %s :%s\r\n" % (Nick, Nick, Nick, Nick))

    time.sleep(Join_Delay)  # wait to join until server eats auth

    lnrec = 0
    while connected:
        try:
            data = sock.recv(Buf_Size)
            time_last_recv = datetime.now()  # Received anything -- reset timer
            lnrec = lnrec + 1
            if lnrec > 2:
                join_chans()
            if not NewPosts.empty():
                m = NewPosts.get()
                speak(m[0], "New post on %s" % (m[1]))

        except socket.timeout as e:
            logging.debug("Receive timed out")
            # Determine whether the connection has timed out:
            since_recv = (datetime.now() - time_last_recv).seconds
            if since_recv > Discon_TO:
                logging.info("Exceeded %d seconds of silence " % Discon_TO
                             + "from server: disconnecting!")
                deinit_socket()
            continue
        except socket.error as e:
            logging.warning("Receive socket error, disconnecting.")
            deinit_socket()
            continue
        except Exception as e:
            logging.exception(e)
            deinit_socket()
            continue
        else:
            if len(data) == 0:
                logging.warning("Receive socket closed, disconnecting.")
                deinit_socket()
                continue
            try:
                try:
                    data = data.strip(b'\r\n').decode("utf-8")
                except UnicodeDecodeError:
                    data = data.strip(b'\r\n').decode('latin-1')
                for l in data.splitlines():
                    received_line(l)
                continue
            except Exception as e:
                logging.exception(e)
                continue

##############################################################################

# Commands:

def cmd_add(arg, user, chan):
    # Add a feed for the chan
    site = arg.split(' ')[0].strip()
    try:
        site_title, articles = parse(site)
    except Exception as e:
        logging.exception(e)
        speak(chan, "Could not add site %s" % site)
        return

    # check to see if the site is in the db yet
    DB_Conn = sqlite3.connect(Site_DB)
    cur = DB_Conn.cursor()

    cur.execute('SELECT * FROM sites WHERE site=?', (site, ))
    if not cur.fetchall():
        cur.execute('INSERT INTO sites VALUES (?, ?)', (site, site_title))

        # update the latest articles
        for item in articles:
            cur.execute('INSERT INTO pages VALUES (?, ?, ?)',
                    (site, item['title'], item['link']))

    # make sure that this is not a duplicate
    cur.execute('SELECT * FROM listening WHERE user=? AND site=?', (chan, site))
    if not cur.fetchall():
        cur.execute('INSERT INTO listening VALUES (?, ?)', (chan, site))

        if articles:
            speak(chan, "Added site %s, %s, latest: %s" %
                    (site, site_title, articles[0]['link']))
        else:
            speak(chan, "Added site %s with no articles found yet." % site)

    else:
        speak(chan, "%s: It looks like you are already following %s" %
                (user, site))

    DB_Conn.commit()
    DB_Conn.close()

def cmd_list(arg, user, chan):
    # list feeds associated here
    DB_Conn = sqlite3.connect(Site_DB)
    cur = DB_Conn.cursor()
    cur.execute('SELECT site FROM listening WHERE user=?', (chan, ))
    sites = cur.fetchall()
    if sites:
        for row in sites:
            speak(chan, '%s' % row[0])
    else:
        speak(chan, 'no feeds followed here')
    DB_Conn.close()

def cmd_help(arg, user, chan):
    # Speak the 'help' text
    speak(chan, "%s: my valid commands are: %s" %
          (user, ', '.join(Commands.keys())))

def cmd_remove(arg, user, chan):
    site = arg.split(' ')[0].strip()

    # drop from table listening
    DB_Conn = sqlite3.connect(Site_DB)
    cur = DB_Conn.cursor()
    cur.execute('DELETE FROM listening WHERE user=? AND site=?', (chan, site))
    DB_Conn.commit()

    # if no users are following site, drop from sites
    cur.execute('SELECT 1 FROM listening WHERE site=?', (site, ))
    if not cur.fetchall():
        cur.execute('DELETE FROM sites WHERE site=?', (site, ))
        DB_Conn.commit()

    DB_Conn.close()

def cmd_src(arg, user, chan):
    speak(chan, "%s: my source code can be seen at: %s" % (user, Src_URL))

def cmd_version(arg, user, chan):
    speak(chan, "I am 'Scoopbot' version %s." % Ver)

def cmd_uptime(arg, user, chan):
    uptime_txt = ""
    uptime = (datetime.now() - time_last_conn)
    days = uptime.days
    hours = uptime.seconds / 3600
    minutes = (uptime.seconds % 3600) / 60
    uptime_txt += '%dd %dh %dm' % (days, hours, minutes)
    # Speak the uptime
    speak(chan, "%s: time since my last reconnect : %s" %
          (user, uptime_txt))

Commands = {
    "help": cmd_help,
    "version": cmd_version,
    "src": cmd_src,
    "uptime": cmd_uptime,
    "list": cmd_list,
    "add": cmd_add,
    "remove": cmd_remove
}

##############################################################################

# All valid received lines end up here
def eat_line(user, chan, text, action):
    # If the line was a command for this bot, process; otherwise ignore.
    if text.startswith(Prefix):
        cmd = text.partition(Prefix)[2].strip()
        cmd = [x.strip() for x in cmd.split(' ', 1)]
        if len(cmd) == 1:
            arg = ""
        else:
            arg = cmd[1]
        # Dispatch this command...
        command = cmd[0]
        logging.debug("Dispatching command '%s' with arg '%s'.." %
                      (command, arg))
        func = Commands.get(command)
        # If this command is undefined:
        if func is None:
            logging.debug("Invalid command: %s" % command)
            # Utter the 'help' text as response to the sad command
            cmd_help("", user, chan)
        else:
            # Is defined command, dispatch it:
            func(arg, user, chan)

##############################################################################

# IRCate; if disconnected, reconnect
def run():
    while 1:
        irc()
        logging.warning("Disconnected, will reconnect...")

# Run continuously.
ircer = threading.Thread(target=run, args=())
ircer.start()

scooper = threading.Thread(target=check_rss, args=())
scooper.start()

ircer.join()
scooper.join()

Leave a Reply

MANDATORY: Adding 33 46 = ?
Answer before clicking "Submit", or comment will be discarded!