Updating to use beanstalkd queueing system

This commit is contained in:
Darius Kazemi 2019-04-29 13:36:04 -07:00
parent 7cfe50605c
commit 5012b8bb17
4 changed files with 57 additions and 21 deletions

View File

@ -8,6 +8,8 @@ This is based on my [Express ActivityPub Server](https://github.com/dariusk/expr
This requires Node.js v10.10.0 or above. This requires Node.js v10.10.0 or above.
You also need `beanstalkd` running. This is a simple and fast queueing system we use to manage polling RSS feeds. [Here are installation instructions](https://beanstalkd.github.io/download.html). On a production server you'll want to [install it as a background process](https://github.com/beanstalkd/beanstalkd/tree/master/adm).
## Installation ## Installation
Clone the repository, then `cd` into its root directory. Install dependencies: Clone the repository, then `cd` into its root directory. Install dependencies:
@ -44,7 +46,7 @@ Go to `https://whateveryourdomainis.com:3000/convert` or whatever port you selec
## Sending out updates to followers ## Sending out updates to followers
There is also a file called `updateFeeds.js` that needs to be run on a cron job or similar scheduler. I like to run mine once a minute. It queries every RSS feed in the database to see if there has been a change to the feed. If there is a new post, it sends out the new post to everyone subscribed to its corresponding ActivityPub Actor. There is also a file called `queueFeeds.js` that needs to be run on a cron job or similar scheduler. I like to run mine once a minute. It queries every RSS feed in the database to see if there has been a change to the feed. If there is a new post, it sends out the new post to everyone subscribed to its corresponding ActivityPub Actor.
## Local testing ## Local testing

View File

@ -10,6 +10,7 @@
"cors": "^2.8.4", "cors": "^2.8.4",
"express": "^4.16.3", "express": "^4.16.3",
"generate-rsa-keypair": "^0.1.2", "generate-rsa-keypair": "^0.1.2",
"jackd": "^1.2.4",
"parse-favicon": "^2.0.0", "parse-favicon": "^2.0.0",
"pug": "^2.0.3", "pug": "^2.0.3",
"request": "^2.87.0", "request": "^2.87.0",

26
queueFeeds.js Normal file
View File

@ -0,0 +1,26 @@
const Database = require('better-sqlite3');
const db = new Database('bot-node.db');
const Jackd = require('jackd');
const beanstalkd = new Jackd();
async function foo() {
// get all feeds from DB
let feeds = db.prepare('select feed from feeds').all();
console.log('!!!',feeds.length);
let count = 0;
await beanstalkd.connect()
for (feed of feeds) {
await beanstalkd.put(feed.feed)
}
await beanstalkd.disconnect()
}
foo()

View File

@ -7,29 +7,37 @@ const db = new Database('bot-node.db'),
crypto = require('crypto'), crypto = require('crypto'),
parser = new Parser({timeout: 2000}); parser = new Parser({timeout: 2000});
// get all feeds from DB const Jackd = require('jackd');
let feeds = db.prepare('select * from feeds').all(); const beanstalkd = new Jackd();
console.log('!!!',feeds.length); beanstalkd.connect()
let count = 0; async function foo() {
while (true) {
doFeed(); try {
const { id, payload } = await beanstalkd.reserve()
function doFeed() { console.log(payload)
let feed = feeds[count]; /* ... process job here ... */
console.log(count, feed.feed); await doFeed(payload)
if (feed === undefined) { await beanstalkd.delete(id)
return; } catch (err) {
// Log error somehow
console.error(err)
}
} }
}
foo()
function doFeed(feedUrl) {
return new Promise((resolve, reject) => {
// fetch new RSS for each feed // fetch new RSS for each feed
parser.parseURL(feed.feed, function(err, feedData) { parser.parseURL(feedUrl, function(err, feedData) {
if (err) { if (err) {
console.log('error fetching', feed.feed, err); reject('error fetching ' + feedUrl + '; ' + err);
doFeed(++count);
} }
else { else {
//console.log(feedData); let feed = db.prepare('select * from feeds where feed = ?').get(feedUrl);
// get the old feed data from the database // get the old feed data from the database
let oldFeed = JSON.parse(feed.content); let oldFeed = JSON.parse(feed.content);
@ -72,15 +80,14 @@ function doFeed() {
// update the DB with new contents // update the DB with new contents
let content = JSON.stringify(feedData); let content = JSON.stringify(feedData);
db.prepare('insert or replace into feeds(feed, username, content) values(?, ?, ?)').run(feed.feed, acct, content); db.prepare('insert or replace into feeds(feed, username, content) values(?, ?, ?)').run(feed.feed, acct, content);
count = count + 1; return resolve('done with ' + feedUrl)
setTimeout(doFeed, 100);
} }
else { else {
count = count + 1; return resolve('done with ' + feedUrl + ', no change')
setTimeout(doFeed, 100);
} }
} }
}); });
}).catch((e) => console.log(e));
} }
// TODO: update the display name of a feed if the feed title has changed // TODO: update the display name of a feed if the feed title has changed