I updated scoopbot to use a database instead of just a list of sites in memory. I have not done anything with Sqlite before, so this was a learning experience for me. Please leave a comment and let me know if you suggest a better way, a more efficient way, or a more Pythonic way, of doing the database stuff.
This change will let it be more robust; for example, previously, if the bot was down and restarted it would ignore anything that was posted in the interim since it rebuilt the list of latest posts when restarted. Now, by checking against the database at a restart, it should have an idea of posts that were added that it missed.
I also used this opportunity to add the ability to have individualized feeds for each user. Also, anybody (who is peered to it on Pestnet) can add feeds in the general channel. This is a privilege, if it gets abused I will change it back to where only I can add feeds.
The new version of scoopbot lives on the Pestnet. If you would like to be added as a peer of scoopbot, please contact me (PeterL on the Pestnet, or leave a comment here).
A change I am thinking of adding is a way to control the bot - with the current version of Blatta, the only way to add peers for the bot is to shut it down and log in to Blatta manually. My idea is to add a table where I can drop in peer, key, and address, and the bot would grab them and add them if they are there.
Here is the code for scoopbot (I will generate a post a vpatch shortly):
#!/usr/bin/python
##############################################################################
## 'Scoopbot', an IRC-RSS feed monitor. ##
## ##
## The IRC part of this is based off of 'Watchglass' by asciilifeform, ##
## see http://www.loper-os.org/?p=3665 ##
## ##
## (C) 2021, 2022 Peter Lambert ( peterl.xyz ) ##
## ##
## You do not have, nor can you ever acquire the right to use, copy or ##
## distribute this software ; should you use this software for any purpose, ##
## or copy and distribute it to anyone or in any manner, you are breaking ##
## the laws of whatever soi-disant jurisdiction, and you promise to ##
## continue doing so for the indefinite future. In any case, please ##
## always : read and understand any software ; verify any PGP signatures ##
## that you use - for any purpose. ##
##############################################################################
import ConfigParser
import Queue
import datetime
import logging
import re
import socket
import sqlite3
import sys
import threading
import time
import urllib2
from datetime import datetime
from xml.etree.ElementTree import XML
##############################################################################
## Config
##############################################################################
# Version. If changing this program, always set this to same # as in MANIFEST
Ver = 744610
cfg = ConfigParser.ConfigParser()
# Single mandatory arg: config file path
if len(sys.argv[1:]) != 1:
# If no args, print usage and exit:
print sys.argv[0] + " CONFIG"
exit(0)
# Read Config
cfg.readfp(open(sys.argv[1]))
# Get log path
logpath = cfg.get("bofh", "log")
# Get IRCism debug toggle
irc_dbg = int(cfg.get("irc", "irc_dbg"))
if irc_dbg == 1:
log_lvl = logging.DEBUG
else:
log_lvl = logging.INFO
# Init debug logger:
logging.basicConfig(filename=logpath, filemode='a', level=log_lvl,
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
# Get the remaining knob values:
try:
# IRCism:
Buf_Size = int(cfg.get("tcp", "bufsize"))
Timeout = int(cfg.get("tcp", "timeout"))
TX_Delay = float(cfg.get("tcp", "t_delay"))
Servers = [x.strip() for x in cfg.get("irc", "servers").split(',')]
Port = int(cfg.get("irc", "port"))
Nick = cfg.get("irc", "nick")
Pass = cfg.get("irc", "pass")
Channels = [x.strip() for x in cfg.get("irc", "chans").split(',')]
Join_Delay = int(cfg.get("irc", "join_t"))
Discon_TO = int(cfg.get("irc", "disc_t"))
# Control:
Prefix = cfg.get("control", "prefix")
Src_URL = cfg.get("control", "src_url")
Controlpass = cfg.get("control", "controlpass")
# RSS Checking:
CheckTime = int(cfg.get("sites", "checktime"))
Max_Reported = int(cfg.get("sites", "max_reported"))
Site_DB = cfg.get("sites", "db")
except Exception as e:
print "Invalid config: ", e
exit(1)
##############################################################################
## Feed Handling stuff
##############################################################################
# Used to hold list of sites to send to IRC
NewPosts = Queue.Queue()
# Take a url to a RSS feed and return the title as a string and an array of
# dicts for the items
def parse(my_url):
# Load data from the given url
resp = urllib2.urlopen(my_url).read()
# Find where the xml tree starts and parse the data as an xml structure
channel = XML(resp[resp.find('<'):])
site_title = channel.find('./channel/title').text.strip()
# Convert from XML to a python structure
items = []
for k in channel.findall('.//item'):
items.append({'title': k.find('title').text.strip(),
'link': k.find('link').text})
return site_title, items
def check_rss():
global NewPosts
# Open a connection to the database
DB_Conn = sqlite3.connect(Site_DB)
cur = DB_Conn.cursor()
# Continually search for new posts
while 1:
cur.execute('SELECT site FROM sites')
for row in cur.fetchall():
blog = row[0]
cur.execute('SELECT user FROM listening WHERE site=?', (blog, ))
listeners = cur.fetchall()
try:
blog_title, posts = parse(blog)
count = 0
for item in posts:
cur.execute('SELECT 1 FROM pages WHERE site=? AND link=?',
(blog, item['link']))
if cur.fetchall():
break
else:
count += 1
for c in listeners:
NewPosts.put([c[0], "%s: [ %s ][ %s ]" % (
blog_title, item['link'], item['title'])])
cur.execute('INSERT INTO pages VALUES (?, ?, ?)',
(blog, item['title'], item['link']))
DB_Conn.commit()
if count >= Max_Reported:
break
except Exception as e:
logging.warning('problem with check of %s : %s' % (blog, e))
time.sleep(CheckTime * 60)
DB_Conn.close()
##############################################################################
## IRC Bot
##############################################################################
# Used to compute 'uptime'
time_last_conn = datetime.now()
# Used to monitor disconnection timeout
time_last_recv = datetime.now()
# Socket will be here:
sock = None
# Initially we are not connected to anything
connected = False
# TX Lock for async. operations
irc_tx_lock = threading.Lock()
def init_socket():
global sock
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Disable Nagle's algorithm for transmit operations
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
connected = False
def deinit_socket():
global connected
global sock
sock.close()
connected = False
# Connect to given host:port; return whether connected
def connect(host, port):
logging.info("Connecting to %s:%s" % (host, port))
sock.settimeout(Timeout)
try:
sock.connect((host, port))
except (socket.timeout, socket.error) as e:
logging.warning(e)
return False
except Exception as e:
logging.exception(e)
return False
else:
logging.info("Connected.")
return True
# Attempt connect to each of hosts, in order, on port; return whether connected
def connect_any(hosts, port):
for host in hosts:
if connect(host, port):
return True
return False
# Transmit IRC message
def send(message):
global connected
if not connected:
logging.warning("Tried to send while disconnected?")
return False
time.sleep(TX_Delay)
logging.debug("> '%s'" % message)
message = "%s\r\n" % message
try:
irc_tx_lock.acquire()
sock.send(message.encode("utf-8"))
except (socket.timeout, socket.error) as e:
logging.warning("Socket could not send! Disconnecting.")
deinit_socket()
return False
except Exception as e:
logging.exception(e)
return False
finally:
irc_tx_lock.release()
# Speak given message on a selected channel
def speak(channel, message):
send("PRIVMSG %s :%s" % (channel, message))
# Standard incoming IRC line (excludes fleanode liquishit, etc)
irc_line_re = re.compile("""^:(\S+)\s+PRIVMSG\s+(\S+)\s+\:(.*)""")
# 'Actions'
irc_act_re = re.compile(""".*ACTION\s+(.*)""")
# input looks like ':PeterL PRIVMSG #pest :!s uptime'
# direct message looks like ':PeterL PRIVMSG scoopbot :just checking'
# A line was received from IRC
def received_line(line):
# Process the traditional pingpong
if line.startswith("PING"):
send("PONG " + line.split()[1])
else:
logging.debug("< '%s'" % line)
standard_line = re.search(irc_line_re, line)
if standard_line:
# Break this line into the standard segments
(user, chan, text) = [s.strip() for s in standard_line.groups()]
# Handle direct messages
if chan == Nick:
chan = user
# Determine whether this line is an 'action' :
action = False
act = re.search(irc_act_re, line)
if act:
action = True
text = act.group(1)
# Remove turd if present:
if text[-1:] == '\x01':
text = text[:-1]
# This line is edible, process it.
eat_line(user, chan, text, action)
joined = False
def join_chans():
global joined
if joined:
return
for chan in Channels:
logging.info("Joining channel '%s'..." % chan)
send("JOIN %s\r\n" % chan)
joined = True
# IRCate until we get disconnected
def irc():
global connected
global time_last_conn
global time_last_recv
global sock
global NewPosts
# Initialize a socket
init_socket()
# Connect to one among the specified servers, in given priority :
while not connected:
connected = connect_any(Servers, Port)
# Save time of last successful connect
time_last_conn = datetime.now()
# Auth to server
# If this is a production bot, rather than test, there will be a PW:
if Pass != "":
send("PASS %s\r\n" % Pass)
send("NICK %s\r\n" % Nick)
send("USER %s %s %s :%s\r\n" % (Nick, Nick, Nick, Nick))
time.sleep(Join_Delay) # wait to join until server eats auth
lnrec = 0
while connected:
try:
data = sock.recv(Buf_Size)
time_last_recv = datetime.now() # Received anything -- reset timer
lnrec = lnrec + 1
if lnrec > 2:
join_chans()
if not NewPosts.empty():
m = NewPosts.get()
speak(m[0], "New post on %s" % (m[1]))
except socket.timeout as e:
logging.debug("Receive timed out")
# Determine whether the connection has timed out:
since_recv = (datetime.now() - time_last_recv).seconds
if since_recv > Discon_TO:
logging.info("Exceeded %d seconds of silence " % Discon_TO
+ "from server: disconnecting!")
deinit_socket()
continue
except socket.error as e:
logging.warning("Receive socket error, disconnecting.")
deinit_socket()
continue
except Exception as e:
logging.exception(e)
deinit_socket()
continue
else:
if len(data) == 0:
logging.warning("Receive socket closed, disconnecting.")
deinit_socket()
continue
try:
try:
data = data.strip(b'\r\n').decode("utf-8")
except UnicodeDecodeError:
data = data.strip(b'\r\n').decode('latin-1')
for l in data.splitlines():
received_line(l)
continue
except Exception as e:
logging.exception(e)
continue
##############################################################################
# Commands:
def cmd_add(arg, user, chan):
# Add a feed for the chan
site = arg.split(' ')[0].strip()
try:
site_title, articles = parse(site)
except Exception as e:
logging.exception(e)
speak(chan, "Could not add site %s" % site)
return
# check to see if the site is in the db yet
DB_Conn = sqlite3.connect(Site_DB)
cur = DB_Conn.cursor()
cur.execute('SELECT * FROM sites WHERE site=?', (site, ))
if not cur.fetchall():
cur.execute('INSERT INTO sites VALUES (?, ?)', (site, site_title))
# update the latest articles
for item in articles:
cur.execute('INSERT INTO pages VALUES (?, ?, ?)',
(site, item['title'], item['link']))
# make sure that this is not a duplicate
cur.execute('SELECT * FROM listening WHERE user=? AND site=?', (chan, site))
if not cur.fetchall():
cur.execute('INSERT INTO listening VALUES (?, ?)', (chan, site))
if articles:
speak(chan, "Added site %s, %s, latest: %s" %
(site, site_title, articles[0]['link']))
else:
speak(chan, "Added site %s with no articles found yet." % site)
else:
speak(chan, "%s: It looks like you are already following %s" %
(user, site))
DB_Conn.commit()
DB_Conn.close()
def cmd_list(arg, user, chan):
# list feeds associated here
DB_Conn = sqlite3.connect(Site_DB)
cur = DB_Conn.cursor()
cur.execute('SELECT site FROM listening WHERE user=?', (chan, ))
sites = cur.fetchall()
if sites:
for row in sites:
speak(chan, '%s' % row[0])
else:
speak(chan, 'no feeds followed here')
DB_Conn.close()
def cmd_help(arg, user, chan):
# Speak the 'help' text
speak(chan, "%s: my valid commands are: %s" %
(user, ', '.join(Commands.keys())))
def cmd_remove(arg, user, chan):
site = arg.split(' ')[0].strip()
# drop from table listening
DB_Conn = sqlite3.connect(Site_DB)
cur = DB_Conn.cursor()
cur.execute('DELETE FROM listening WHERE user=? AND site=?', (chan, site))
DB_Conn.commit()
# if no users are following site, drop from sites
cur.execute('SELECT 1 FROM listening WHERE site=?', (site, ))
if not cur.fetchall():
cur.execute('DELETE FROM sites WHERE site=?', (site, ))
DB_Conn.commit()
DB_Conn.close()
def cmd_src(arg, user, chan):
speak(chan, "%s: my source code can be seen at: %s" % (user, Src_URL))
def cmd_version(arg, user, chan):
speak(chan, "I am 'Scoopbot' version %s." % Ver)
def cmd_uptime(arg, user, chan):
uptime_txt = ""
uptime = (datetime.now() - time_last_conn)
days = uptime.days
hours = uptime.seconds / 3600
minutes = (uptime.seconds % 3600) / 60
uptime_txt += '%dd %dh %dm' % (days, hours, minutes)
# Speak the uptime
speak(chan, "%s: time since my last reconnect : %s" %
(user, uptime_txt))
Commands = {
"help": cmd_help,
"version": cmd_version,
"src": cmd_src,
"uptime": cmd_uptime,
"list": cmd_list,
"add": cmd_add,
"remove": cmd_remove
}
##############################################################################
# All valid received lines end up here
def eat_line(user, chan, text, action):
# If the line was a command for this bot, process; otherwise ignore.
if text.startswith(Prefix):
cmd = text.partition(Prefix)[2].strip()
cmd = [x.strip() for x in cmd.split(' ', 1)]
if len(cmd) == 1:
arg = ""
else:
arg = cmd[1]
# Dispatch this command...
command = cmd[0]
logging.debug("Dispatching command '%s' with arg '%s'.." %
(command, arg))
func = Commands.get(command)
# If this command is undefined:
if func is None:
logging.debug("Invalid command: %s" % command)
# Utter the 'help' text as response to the sad command
cmd_help("", user, chan)
else:
# Is defined command, dispatch it:
func(arg, user, chan)
##############################################################################
# IRCate; if disconnected, reconnect
def run():
while 1:
irc()
logging.warning("Disconnected, will reconnect...")
# Run continuously.
ircer = threading.Thread(target=run, args=())
ircer.start()
scooper = threading.Thread(target=check_rss, args=())
scooper.start()
ircer.join()
scooper.join()