From 5012b8bb1788f5da3f1684f9408bf352985d9c2f Mon Sep 17 00:00:00 2001 From: Darius Kazemi Date: Mon, 29 Apr 2019 13:36:04 -0700 Subject: [PATCH 1/2] Updating to use beanstalkd queueing system --- README.md | 4 +++- package.json | 1 + queueFeeds.js | 26 ++++++++++++++++++++++++++ updateFeeds.js | 47 +++++++++++++++++++++++++++-------------------- 4 files changed, 57 insertions(+), 21 deletions(-) create mode 100644 queueFeeds.js diff --git a/README.md b/README.md index bf623d9..6d492b5 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,8 @@ This is based on my [Express ActivityPub Server](https://github.com/dariusk/expr This requires Node.js v10.10.0 or above. +You also need `beanstalkd` running. This is a simple and fast queueing system we use to manage polling RSS feeds. [Here are installation instructions](https://beanstalkd.github.io/download.html). On a production server you'll want to [install it as a background process](https://github.com/beanstalkd/beanstalkd/tree/master/adm). + ## Installation Clone the repository, then `cd` into its root directory. Install dependencies: @@ -44,7 +46,7 @@ Go to `https://whateveryourdomainis.com:3000/convert` or whatever port you selec ## Sending out updates to followers -There is also a file called `updateFeeds.js` that needs to be run on a cron job or similar scheduler. I like to run mine once a minute. It queries every RSS feed in the database to see if there has been a change to the feed. If there is a new post, it sends out the new post to everyone subscribed to its corresponding ActivityPub Actor. +There is also a file called `queueFeeds.js` that needs to be run on a cron job or similar scheduler. I like to run mine once a minute. It queries every RSS feed in the database to see if there has been a change to the feed. If there is a new post, it sends out the new post to everyone subscribed to its corresponding ActivityPub Actor. ## Local testing diff --git a/package.json b/package.json index bb25975..79fdf05 100644 --- a/package.json +++ b/package.json @@ -10,6 +10,7 @@ "cors": "^2.8.4", "express": "^4.16.3", "generate-rsa-keypair": "^0.1.2", + "jackd": "^1.2.4", "parse-favicon": "^2.0.0", "pug": "^2.0.3", "request": "^2.87.0", diff --git a/queueFeeds.js b/queueFeeds.js new file mode 100644 index 0000000..2187766 --- /dev/null +++ b/queueFeeds.js @@ -0,0 +1,26 @@ +const Database = require('better-sqlite3'); +const db = new Database('bot-node.db'); +const Jackd = require('jackd'); +const beanstalkd = new Jackd(); + + +async function foo() { + + // get all feeds from DB + let feeds = db.prepare('select feed from feeds').all(); + + console.log('!!!',feeds.length); + + let count = 0; + + await beanstalkd.connect() + + for (feed of feeds) { + await beanstalkd.put(feed.feed) + } + + await beanstalkd.disconnect() + +} + +foo() diff --git a/updateFeeds.js b/updateFeeds.js index b68021b..9ffceaf 100644 --- a/updateFeeds.js +++ b/updateFeeds.js @@ -7,29 +7,37 @@ const db = new Database('bot-node.db'), crypto = require('crypto'), parser = new Parser({timeout: 2000}); -// get all feeds from DB -let feeds = db.prepare('select * from feeds').all(); +const Jackd = require('jackd'); +const beanstalkd = new Jackd(); -console.log('!!!',feeds.length); +beanstalkd.connect() -let count = 0; - -doFeed(); - -function doFeed() { - let feed = feeds[count]; - console.log(count, feed.feed); - if (feed === undefined) { - return; +async function foo() { + while (true) { + try { + const { id, payload } = await beanstalkd.reserve() + console.log(payload) + /* ... process job here ... */ + await doFeed(payload) + await beanstalkd.delete(id) + } catch (err) { + // Log error somehow + console.error(err) + } } +} + +foo() + +function doFeed(feedUrl) { +return new Promise((resolve, reject) => { // fetch new RSS for each feed - parser.parseURL(feed.feed, function(err, feedData) { + parser.parseURL(feedUrl, function(err, feedData) { if (err) { - console.log('error fetching', feed.feed, err); - doFeed(++count); + reject('error fetching ' + feedUrl + '; ' + err); } else { - //console.log(feedData); + let feed = db.prepare('select * from feeds where feed = ?').get(feedUrl); // get the old feed data from the database let oldFeed = JSON.parse(feed.content); @@ -72,15 +80,14 @@ function doFeed() { // update the DB with new contents let content = JSON.stringify(feedData); db.prepare('insert or replace into feeds(feed, username, content) values(?, ?, ?)').run(feed.feed, acct, content); - count = count + 1; - setTimeout(doFeed, 100); + return resolve('done with ' + feedUrl) } else { - count = count + 1; - setTimeout(doFeed, 100); + return resolve('done with ' + feedUrl + ', no change') } } }); +}).catch((e) => console.log(e)); } // TODO: update the display name of a feed if the feed title has changed From 737c418ed7257abe1f1af835c085ecdec193a550 Mon Sep 17 00:00:00 2001 From: Darius Kazemi Date: Tue, 7 May 2019 10:20:21 -0700 Subject: [PATCH 2/2] Fixes to formatting and payload processing order --- updateFeeds.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/updateFeeds.js b/updateFeeds.js index 9ffceaf..bcdbc65 100644 --- a/updateFeeds.js +++ b/updateFeeds.js @@ -18,8 +18,8 @@ async function foo() { const { id, payload } = await beanstalkd.reserve() console.log(payload) /* ... process job here ... */ - await doFeed(payload) await beanstalkd.delete(id) + await doFeed(payload) } catch (err) { // Log error somehow console.error(err) @@ -136,8 +136,8 @@ function transformContent(item) { }); // couple of hacky regexes to make sure we clean up everything - item.content = $('body').html().replace(/^(\n|\r)/,'').replace(/>\r+\r+\r+<').replace(/ +/g, ''); + item.content = item.content.replace(/^(\n|\r)/,'').replace(/>\r+<').replace(/>\s*<').replace(/>\u200B+<').replace(/ +/g, '').replace(/

<\/p>/g,''); return item; }