Initial 'rewrite' of hatbot

This commit is contained in:
Siina Mashek 2022-04-19 13:27:39 +03:00
parent efef9cd7d9
commit ac113944cd
12 changed files with 428 additions and 1 deletions

5
.gitignore vendored
View File

@ -1,3 +1,8 @@
# Project specific
*.kate-swp
*.log
config/owncast.json
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

13
ATTRIBUTION.md Normal file
View File

@ -0,0 +1,13 @@
# Code Attribution
## ameliabot.logger
`ameliabot.logger` is based on [colargulog]() by David Ohana under the [Apache-2.0 License](). Changed for `flake8` compliance.
## ameliabot.owncast
`ameliabot.owncast` originally started as [hatbot]() by hatniX, licensed under the [Unlicense](), which this project also uses.
[colargulog]:https://github.com/davidohana/colargulog
[Apache-2.0 License]:https://www.apache.org/licenses/LICENSE-2.0.txt
[Unlicense]:https://unlicense.org/

View File

@ -1,3 +1,31 @@
# ameliabot # ameliabot
Ameliabot, a chat bot for owncast. A chatbot for owncast, inspiration from [hatbot]() by [hatniX]().
Code attributions can be found in `ATTRIBUTIONS.md`
## Requirements
* [Owncast]() server
* Python 3
* Flask
* psycopg2
## Setup
* Install Flask and psycopg2
* `pip --user install Flask psycopg2`
* If you don't want to fiddle with psycopg2, you can install `python-psycopg2` from your distribution's package manager
* Create an Owncast webhook url pointing to your bot's location
* http://localhost:5000/webhook/owncast if bot and owncast are on the same machine
* Copy `config-example.json` to `config.json` and fill out the information required
Ameliabot can be run by executing `start.sh` or:
```
FLASK_APP=bot.py python3 -m flask run
```
Please use a proper uWSGI proxy if the bot is not on the same machine as owncast.
[Owncast]:https://owncast.online
[hatbot]:https://github.com/hatniX/hatbot
[hatniX]:https://hatnix.net

1
alias.json Normal file
View File

@ -0,0 +1 @@
{}

30
ameliabot/init_logging.py Normal file
View File

@ -0,0 +1,30 @@
"""
colargulog
Python3 Logging with Colored Arguments and new string formatting style
Written by david.ohana@ibm.com
License: Apache-2.0
"""
import sys
from ameliabot.logger import ColorizedArgsFormatter, BraceFormatStyleFormatter
from ameliabot.logger import logging
def init_logging():
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
console_level = "DEBUG"
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setLevel(console_level)
console_format = "%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s" # NOQA
colored_formatter = ColorizedArgsFormatter(console_format)
console_handler.setFormatter(colored_formatter)
root_logger.addHandler(console_handler)
file_handler = logging.FileHandler("app.log")
file_level = "DEBUG"
file_handler.setLevel(file_level)
file_format = "%(asctime)s - %(name)s (%(lineno)s) - %(levelname)-8s - %(threadName)-12s - %(message)s" # NOQA
file_handler.setFormatter(BraceFormatStyleFormatter(file_format))
root_logger.addHandler(file_handler)

134
ameliabot/logger.py Normal file
View File

@ -0,0 +1,134 @@
"""
colargulog
Python3 Logging with Colored Arguments and new string formatting style
Written by david.ohana@ibm.com
License: Apache-2.0
"""
import logging
import logging.handlers
import re
class ColorCodes:
grey = "\x1b[38;21m"
green = "\x1b[1;32m"
yellow = "\x1b[33;21m"
red = "\x1b[31;21m"
bold_red = "\x1b[31;1m"
blue = "\x1b[1;34m"
light_blue = "\x1b[1;36m"
purple = "\x1b[1;35m"
reset = "\x1b[0m"
class ColorizedArgsFormatter(logging.Formatter):
arg_colors = [ColorCodes.purple, ColorCodes.light_blue]
level_fields = ["levelname", "levelno"]
level_to_color = {
logging.DEBUG: ColorCodes.grey,
logging.INFO: ColorCodes.green,
logging.WARNING: ColorCodes.yellow,
logging.ERROR: ColorCodes.red,
logging.CRITICAL: ColorCodes.bold_red,
}
def __init__(self, fmt: str):
super().__init__()
self.level_to_formatter = {}
def add_color_format(level: int):
color = ColorizedArgsFormatter.level_to_color[level]
_format = fmt
for fld in ColorizedArgsFormatter.level_fields:
search = r"(%\(" + fld + r"\).*?s)"
_format = re.sub(search, f"{color}\\1{ColorCodes.reset}", _format) # NOQA
formatter = logging.Formatter(_format)
self.level_to_formatter[level] = formatter
add_color_format(logging.DEBUG)
add_color_format(logging.INFO)
add_color_format(logging.WARNING)
add_color_format(logging.ERROR)
add_color_format(logging.CRITICAL)
@staticmethod
def rewrite_record(record: logging.LogRecord):
if not BraceFormatStyleFormatter.is_brace_format_style(record):
return
msg = record.msg
msg = msg.replace("{", "_{{")
msg = msg.replace("}", "_}}")
placeholder_count = 0
# add ANSI escape code for next alternating
# color before each formatting parameter
# and reset color after it.
while True:
if "_{{" not in msg:
break
color_index = placeholder_count % len(ColorizedArgsFormatter.arg_colors) # NOQA
color = ColorizedArgsFormatter.arg_colors[color_index]
msg = msg.replace("_{{", color + "{", 1)
msg = msg.replace("_}}", "}" + ColorCodes.reset, 1)
placeholder_count += 1
record.msg = msg.format(*record.args)
record.args = []
def format(self, record):
orig_msg = record.msg
orig_args = record.args
formatter = self.level_to_formatter.get(record.levelno)
self.rewrite_record(record)
formatted = formatter.format(record)
# restore log record to original state for other handlers
record.msg = orig_msg
record.args = orig_args
return formatted
class BraceFormatStyleFormatter(logging.Formatter):
def __init__(self, fmt: str):
super().__init__()
self.formatter = logging.Formatter(fmt)
@staticmethod
def is_brace_format_style(record: logging.LogRecord):
if len(record.args) == 0:
return False
msg = record.msg
if '%' in msg:
return False
count_of_start_param = msg.count("{")
count_of_end_param = msg.count("}")
if count_of_start_param != count_of_end_param:
return False
if count_of_start_param != len(record.args):
return False
return True
@staticmethod
def rewrite_record(record: logging.LogRecord):
if not BraceFormatStyleFormatter.is_brace_format_style(record):
return
record.msg = record.msg.format(*record.args)
record.args = []
def format(self, record):
orig_msg = record.msg
orig_args = record.args
self.rewrite_record(record)
formatted = self.formatter.format(record)
# restore log record to original state for other handlers
record.msg = orig_msg
record.args = orig_args
return formatted

117
ameliabot/owncast.py Normal file
View File

@ -0,0 +1,117 @@
import html
import json
import random
import requests
from requests.structures import CaseInsensitiveDict
from ameliabot.logger import logging
from ameliabot.quote import Quote
from flask import request
bot_version = "0.0.1"
# Get basic owncast bot config
with open("config/owncast.json", "r") as f:
config = json.load(f)
logging.info("Configuration loaded")
with open("commands.json", "r") as f:
commands = json.load(f)
logging.info("Commands loaded")
with open("alias.json", "r") as f:
aliases = json.load(f)
logging.info("Aliases loaded")
quote = Quote(
database=config["quote_db_name"],
user=config["quote_db_user"],
password=config["quote_db_pass"],
)
# prepare the header for the bot posts
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/json"
headers["Authorization"] = "Bearer {}".format(config["access_token"])
def parse_webhook():
hook_type = request.json["type"]
data = request.json["eventData"]
if "CHAT" in hook_type:
data = process_chat(data)
if data:
logging.debug("Processed: {}".format(data))
data = '{"body": "%s"}' % data
resp = requests.post(
config["owncast_server"],
headers=headers,
data=data.encode('utf-8'))
if resp.status_code == 200:
logging.debug("RESP: {}".format(data))
else:
logging.error("Status code {} returned".format(
str(resp.status_code)))
def get_command(text):
first_word = text.split()[0].lower()
if first_word in commands:
return commands[first_word]
elif first_word in aliases:
return commands[first_word]
def get_quote(num):
try:
num = int(num)
except TypeError:
pass
except ValueError:
pass
q = quote.get(num)
return html.escape(q)
def process_chat(data):
timestamp = "{} {}".format(
data["timestamp"][0:10], data["timestamp"][11:19])
sender = data["user"]["displayName"]
text = data["body"]
command_reply = get_command(text)
logging.info("{} <{}> {}".format(timestamp, sender, text))
if command_reply:
try:
first_parameter = text.split(" ")[1]
except IndexError:
first_parameter = None
# Move this dictionary to a configuration file or something
# This is so bad, it runs everything every command :pikajoy:
placeholders = {
"sender": sender,
"streamer": config["streamer_name"],
"target": first_parameter,
"random": str(random.randrange(1, 100, 1)),
"commands": ", ".join(list(commands.keys())),
"aliases": ", ".join(list(aliases.keys())),
"bot_version": bot_version,
"quote": get_quote(first_parameter),
"quote_parameters": "Not implemented yet",
}
for placeholder in placeholders:
findme = "{%s}" % placeholder
if findme in command_reply:
if placeholders[placeholder] == "":
command_reply = "%s required" % placeholder.upper()
command_reply = command_reply.replace(
findme, placeholders[placeholder])
return command_reply

56
ameliabot/quote.py Normal file
View File

@ -0,0 +1,56 @@
from datetime import datetime
import random
import psycopg2
class Quote:
def __init__(self, database, user, password):
self.conn = psycopg2.connect(database, user, password)
self.num_quotes = self._get_num_quotes()
def insert(self, owner, submitter, text):
cur = self.conn.cursor()
text = text.replace("'", "''")
cur.execute('''
INSERT INTO quotes (owner, submitter, text, timestamp)
VALUES ('{}', '{}', E'{}', current_timestamp)'''.format(
owner, submitter, text))
self.conn.commit()
cur.close()
self.num_quotes += 1
def get(self, arg=None):
if arg:
cur = self.conn.cursor()
ret = "SELECT number, text, timestamp FROM quotes WHERE "
try:
arg = int(arg)
if arg > self.num_quotes:
return "No quote matching that number."
ret += "number = {}".format(arg)
except ValueError:
ret += "text like '%{}%'".format(arg.lower())
cur.execute(ret)
quotes = cur.fetchall()
cur.close()
return self._format(quotes)
else:
return self.get(random.randint(0, self.num_quotes))
def _get_num_quotes(self):
cur = self.conn.cursor()
cur.execute("SELECT COUNT(id) FROM quotes")
return cur.fetchone()[0]
def _format(self, quotes):
if len(quotes) >= 1:
data = quotes[random.randint(0, len(quotes)-1)]
else:
try:
data = quotes[0]
except IndexError:
return "No quote."
return "{}. {}, {}".format(
data[0], data[1], datetime.strftime(data[2], '%Y'))

21
bot.py Normal file
View File

@ -0,0 +1,21 @@
#!/usr/bin/env python
from flask import Flask, Response
from ameliabot import owncast
from ameliabot.init_logging import init_logging
from ameliabot.logger import logging
init_logging()
logging.info("Loaded ameliabot v%s" % owncast.bot_version)
# the url of the Owncast API for bot posts
owncast_url = "{}/api/integrations/chat/send".format(
owncast.config["owncast_server"])
app = Flask(__name__)
@app.route('/webhook/owncast', methods=['POST'])
def respond():
owncast.parse_webhook()
return Response(status=200)

12
commands.json Normal file
View File

@ -0,0 +1,12 @@
{
"!help": "Commands: {commands}",
"!backseat": "**No backseating** unless I specifically ask for help.",
"!hydrate": "[HYDRATE] {sender} wants {streamer} to take a drink!",
"!stretch": "[STRETCH] {sender} reminds {streamer} to stretch dat body.",
"!save": "SAVE! **NOW!!!**",
"!love": "There is **{random}% love** between {sender} and {target} <3",
"!slap": "*slaps {target} with a trout >:)*",
"!bot": "I am currently running **ameliabot {bot_version}**",
"!quote": "{quote}",
"!addquote": "{quote_parameters}"
}

View File

@ -0,0 +1,8 @@
{
"owncast_server": "localhost",
"access_token": "",
"streamer_name": "owncast streamer",
"quote_db_name": "",
"quote_db_user": "",
"quote_db_pass": ""
}

2
start.sh Executable file
View File

@ -0,0 +1,2 @@
#!/bin/sh
FLASK_APP=bot.py python3 -m flask run