Merged QMEM and DNS cache; removed implied lazy switch.

Improved immediate mode handling, however does not add data queries to
QMEM in immediate mode (and does not work either).
This commit is contained in:
frekky 2015-10-17 22:25:31 +08:00
parent e61b38b9a3
commit c38e7d30a8
4 changed files with 137 additions and 185 deletions

View File

@ -118,18 +118,28 @@ send_raw(int fd, uint8_t *buf, size_t buflen, int user, int cmd, struct sockaddr
sendto(fd, packet, len, 0, (struct sockaddr *) from, fromlen);
}
/* Ringbuffer Query Handling (qmem):
/* Ringbuffer Query Handling (qmem) and DNS Cache:
This is used to make the handling duplicates and query timeouts simpler
and all in one place.
Using this, lazy mode should be possible with n queries (n <= windowsize)
and all handled in one place.
Using this, lazy mode is possible with n queries (n <= windowsize)
New queries are placed consecutively in the buffer, replacing any old
queries (already responded to) if length == QMEM_LEN. Old queries are kept
to check for dupes etc.
as a record for duplicate requests. If a dupe is found and USE_DNSCACHE is
defined, the previous answer is sent (if it exists), otherwise an invalid
response is sent.
TODO: modify a bit to replace dnscache entirely?
it seems the only difference is qmem doesn't store answers. */
#ifdef QMEM_LEN
On the DNS cache:
This cache is implemented to better handle the aggressively impatient DNS
servers that very quickly re-send requests when we choose to not
immediately answer them in lazy mode. This cache works much better than
pruning(=dropping) the improper requests, since the DNS server will
actually get an answer instead of silence.
Because of the CMC in both ping and upstream data, unwanted cache hits
are prevented. Due to the combination of CMC and varying sequence IDs, it
is extremely unlikely that any duplicate answers will be incorrectly sent
during a session (given QMEM_LEN is not very large). */
#define QMEM_DEBUG(l, u, ...) \
if (debug >= l) {\
@ -141,26 +151,32 @@ send_raw(int fd, uint8_t *buf, size_t buflen, int user, int cmd, struct sockaddr
static void
qmem_init(int userid)
{
memset(&users[userid].qmem, 0, sizeof(struct query_buffer));
memset(&users[userid].qmem, 0, sizeof(struct qmem_buffer));
// users[userid].qmem.end = 1;
for (size_t i = 0; i < QMEM_LEN; i++) {
users[userid].qmem.queries[i].id = -1;
users[userid].qmem.queries[i].q.id = -1;
}
// TODO dns cache init in qmem
}
static int
qmem_is_cached(int dns_fd, int userid, struct query *q)
/* Check if an answer for a particular query is cached in qmem
* If so, sends an "invalid" answer
* If so, sends an "invalid" answer or one from DNS cache
* Returns 1 if new query, 0 if cached (and then answered) */
{
struct query_buffer *buf;
struct qmem_buffer *buf;
struct query *pq;
char *data = "x";
char dataenc = 'T';
size_t len = 1;
int dnscache = 0;
buf = &users[userid].qmem;
/* Check if this is a duplicate query */
for (size_t p = buf->start; p != buf->end; p = (p + 1) % QMEM_LEN) {
pq = &buf->queries[p];
pq = &buf->queries[p].q;
if (pq->id != q->id)
continue;
if (pq->type != q->type)
@ -169,10 +185,19 @@ qmem_is_cached(int dns_fd, int userid, struct query *q)
if (strcasecmp(pq->name, q->name))
continue;
QMEM_DEBUG(2, userid, "OUT for '%s' == duplicate, sending illegal reply", q->name);
/* Aha! A match! */
#ifdef USE_DNSCACHE
if (buf->queries[p].a.len) {
data = (char *)buf->queries[p].a.data;
len = buf->queries[p].a.len;
dataenc = users[userid].downenc;
dnscache = 1;
}
#endif
QMEM_DEBUG(2, userid, "OUT from qmem for '%s', %s", q->name,
dnscache ? "answer from DNS cache" : "sending invalid response");
// TODO cache answers/respond using cache? (merge with dnscache)
write_dns(dns_fd, q, "x", 1, 'T');
write_dns(dns_fd, q, data, len, dataenc);
return 0;
}
return 1;
@ -182,7 +207,7 @@ static int
qmem_append(int userid, struct query *q)
/* Appends incoming query to the buffer. */
{
struct query_buffer *buf;
struct qmem_buffer *buf;
buf = &users[userid].qmem;
if (buf->num_pending >= QMEM_LEN) {
@ -201,30 +226,44 @@ qmem_append(int userid, struct query *q)
QMEM_DEBUG(5, userid, "add query ID %d, timeout %lu ms", q->id, timeval_to_ms(&users[userid].dns_timeout));
/* Copy query into buffer */
memcpy(&buf->queries[buf->end], q, sizeof(struct query));
memcpy(&buf->queries[buf->end].q, q, sizeof(struct query));
#ifdef USE_DNSCACHE
buf->queries[buf->end].a.len = 0;
#endif
buf->end = (buf->end + 1) % QMEM_LEN;
buf->num_pending += 1;
return 1;
}
static void
qmem_answered(int userid)
qmem_answered(int userid, uint8_t *data, size_t len)
/* Call when oldest/first/earliest query added has been answered */
{
struct query_buffer *buf;
struct qmem_buffer *buf;
size_t answered;
buf = &users[userid].qmem;
if (buf->num_pending == 0) {
/* Most likely caused by bugs somewhere else. */
QMEM_DEBUG(1, userid, "can't answer query that has already been answered! Fix bugs.");
QMEM_DEBUG(1, userid, "Query answered with 0 in qmem! Fix bugs.");
return;
}
answered = buf->start_pending;
buf->start_pending = (buf->start_pending + 1) % QMEM_LEN;
buf->num_pending -= 1;
QMEM_DEBUG(3, userid, "query ID %d answered", buf->queries[answered].id);
#ifdef USE_DNSCACHE
/* Add answer to query entry */
if (len && data) {
if (len > 4096) {
QMEM_DEBUG(1, userid, "got answer with length >4096!");
}
memcpy(&buf->queries[answered].a.data, data, MIN(len, 4096));
buf->queries[answered].a.len = len;
}
#endif
QMEM_DEBUG(3, userid, "query ID %d answered", buf->queries[answered].q.id);
}
struct query *
@ -232,12 +271,12 @@ qmem_get_next_response(int userid)
/* Gets oldest query to be responded to (for lazy mode) or NULL if none available
* The query is NOT marked as "answered" since that is done later. */
{
struct query_buffer *buf;
struct qmem_buffer *buf;
struct query *q;
buf = &users[userid].qmem;
if (buf->length == 0 || buf->num_pending == 0)
return NULL;
q = &buf->queries[buf->start_pending];
q = &buf->queries[buf->start_pending].q;
QMEM_DEBUG(3, userid, "next response using cached query: ID %d", q->id);
return q;
}
@ -247,7 +286,7 @@ qmem_max_wait(struct dnsfd *dns_fds, int *touser, struct query **sendq)
/* Gets max interval before the next query has to be responded to
* Response(s) are sent automatically for queries if:
* - the query has timed out
* - the user has data to send, pending ACKs or ping and spare pending queries
* - the user has data to send or pending ACKs, and spare pending queries
* - the user has excess pending queries (>downstream window size)
* Returns largest safe time to wait before next timeout */
{
@ -266,28 +305,28 @@ qmem_max_wait(struct dnsfd *dns_fds, int *touser, struct query **sendq)
continue;
u = &users[userid];
qnum = u->qmem.start_pending;
if (u->qmem.num_pending == 0)
continue;
/* Keep track of how many fragments we can send */
total = window_sending(u->outgoing);
if (u->qmem.num_pending > u->outgoing->windowsize) {
/* calculate number of "excess" queries */
total = MAX(total, u->qmem.num_pending - u->outgoing->windowsize);
if (u->lazy) {
total = window_sending(u->outgoing);
if (u->qmem.num_pending > u->outgoing->windowsize) {
/* calculate number of "excess" queries */
total = MAX(total, u->qmem.num_pending - u->outgoing->windowsize);
}
} else {
/* User in immediate mode, must answer all pending queries */
total = u->qmem.num_pending;
}
sending = total;
sent = 0;
if ((!u->lazy) && u->qmem.num_pending > 0) {
QMEM_DEBUG(2, userid, "User switched to immediate mode, answering all pending queries...");
sending = u->qmem.num_pending;
}
qnum = u->qmem.start_pending;
for (; qnum != u->qmem.end; qnum = (qnum + 1) % QMEM_LEN) {
q = &u->qmem.queries[qnum];
q = &u->qmem.queries[qnum].q;
/* queries will always be in time order */
timeradd(&q->time_recv, &u->dns_timeout, &timeout);
@ -295,18 +334,15 @@ qmem_max_wait(struct dnsfd *dns_fds, int *touser, struct query **sendq)
/* respond to a query with ping/data if:
* - query has timed out (ping, or data if available)
* - user has pending data (always data)
* - user has pending ACK (either)
* - user has pending ping (always ping, with data if available) */
timersub(&q->time_recv, &now, &age);
* - user has pending ACK (either) */
timersub(&now, &q->time_recv, &age);
age_ms = timeval_to_ms(&age);
/* only consider "immediate" when age is negligible */
immediate = age_ms <= 10;
immediate = llabs(age_ms) <= 10;
if (debug >= 3) {
QMEM_DEBUG(3, userid, "Auto response to cached query: ID %d, %ld ms old (%s), timeout %ld ms",
q->id, age_ms, immediate ? "immediate" : "lazy", timeval_to_ms(&u->dns_timeout));
}
QMEM_DEBUG(3, userid, "Auto response to cached query: ID %d, %ld ms old (%s), timeout %ld ms",
q->id, age_ms, immediate ? "immediate" : "lazy", timeval_to_ms(&u->dns_timeout));
sent++;
QMEM_DEBUG(4, userid, "ANSWER q id %d, ACK %d; sent %lu of %lu + sending another %lu",
@ -351,85 +387,6 @@ qmem_max_wait(struct dnsfd *dns_fds, int *touser, struct query **sendq)
return soonest;
}
#endif /* QMEM_LEN */
#ifdef DNSCACHE_LEN
/* On the DNS cache:
This cache is implemented to better handle the aggressively impatient DNS
servers that very quickly re-send requests when we choose to not
immediately answer them in lazy mode. This cache works much better than
pruning(=dropping) the improper requests, since the DNS server will
actually get an answer instead of silence.
Because of the CMC in both ping and upstream data, unwanted cache hits
are prevented. Data-CMC is only 36 counts, so our cache length should
not exceed 36/2=18 packets. (This quick rule assumes all packets are
otherwise equal, which they arent: up/downstream seq, TCP/IP headers and
the actual data
*/
static void
save_to_dnscache(int userid, struct query *q, char *answer, int answerlen)
/* Store answer in our little DNS cache. */
{
int fill;
if (answerlen > sizeof(users[userid].dnscache_answer[fill]))
return; /* can't store this */
fill = users[userid].dnscache_lastfilled + 1;
if (fill >= DNSCACHE_LEN)
fill = 0;
memcpy(&(users[userid].dnscache_q[fill]), q, sizeof(struct query));
memcpy(users[userid].dnscache_answer[fill], answer, answerlen);
users[userid].dnscache_answerlen[fill] = answerlen;
users[userid].dnscache_lastfilled = fill;
}
static int
answer_from_dnscache(int dns_fd, int userid, struct query *q)
/* Checks cache and sends repeated answer if we alreay saw this query recently.
Returns: 1 = answer sent, drop this query, 0 = no answer sent, this is
a new query. */
{
int i;
int use;
for (i = 0; i < DNSCACHE_LEN ; i++) {
/* Try cache most-recent-first */
use = users[userid].dnscache_lastfilled - i;
if (use < 0)
use += DNSCACHE_LEN;
if (users[userid].dnscache_q[use].id == 0)
continue;
if (users[userid].dnscache_answerlen[use] <= 0)
continue;
if (users[userid].dnscache_q[use].type != q->type ||
strcmp(users[userid].dnscache_q[use].name, q->name))
continue;
/* okay, match */
DEBUG(1, "OUT user %d %s from dnscache", userid, q->name);
write_dns(dns_fd, q, users[userid].dnscache_answer[use],
users[userid].dnscache_answerlen[use],
users[userid].downenc);
q->id = 0; /* this query was used */
return 1;
}
/* here only when no match found */
return 0;
}
#endif /* DNSCACHE_LEN */
static int
get_dns_fd(struct dnsfd *fds, struct sockaddr_storage *addr)
@ -560,13 +517,8 @@ send_data_or_ping(struct dnsfd *dns_fds, int userid, struct query *q,
write_dns(get_dns_fd(dns_fds, &q->from), q, (char *)pkt,
datalen + headerlen, users[userid].downenc);
#ifdef DNSCACHE_LEN
save_to_dnscache(userid, q, (char *)pkt, datalen + headerlen);
#endif
#ifdef QMEM_LEN
/* mark query as answered */
qmem_answered(userid);
#endif
qmem_answered(userid, pkt, datalen + headerlen);
window_tick(out);
}
@ -1234,7 +1186,6 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
if (version == PROTOCOL_VERSION) {
userid = find_available_user();
if (userid >= 0) {
int i;
struct tun_user *u = &users[userid];
u->seed = rand();
/* Store remote IP number */
@ -1262,16 +1213,7 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
window_buffer_clear(u->outgoing);
window_buffer_clear(u->incoming);
u->next_upstream_ack = -1;
#ifdef QMEM_LEN
qmem_init(userid);
#endif
#ifdef DNSCACHE_LEN
for (i = 0; i < DNSCACHE_LEN; i++) {
u->dnscache_q[i].id = 0;
u->dnscache_answerlen[i] = 0;
}
u->dnscache_lastfilled = 0;
#endif
DEBUG(1, "User %d connected with correct version from %s.",
userid, format_addr(&q->from, q->fromlen));
@ -1620,7 +1562,7 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
} else if(in[0] == 'P' || in[0] == 'p') { /* Ping request */
int dn_seq, up_seq, dn_winsize, up_winsize, dn_ack;
int respond;
unsigned timeout_ms;
unsigned timeout_ms, set_timeout;
struct timeval timeout;
/* We can't handle id=0, that's "no packet" to the dnscache. So drop
@ -1644,16 +1586,9 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
return; /* illegal id */
}
#ifdef DNSCACHE_LEN
/* Check if cached */
if (answer_from_dnscache(dns_fd, userid, q))
return;
#endif // XXX hmm these look very similar...
#ifdef QMEM_LEN
/* Check if cached */
if (!qmem_is_cached(dns_fd, userid, q))
return;
#endif
dn_ack = ((unpacked[8] >> 2) & 1) ? unpacked[1] : -1;
up_winsize = unpacked[2];
@ -1665,29 +1600,36 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
timeout = ms_to_timeval(timeout_ms);
respond = unpacked[8] & 1;
set_timeout = (unpacked[8] >> 3) & 1;
DEBUG(3, "PING pkt user %d, down %d/%d, up %d/%d, ACK %d, %stimeout %u ms, respond %d (flags %02X)",
userid, dn_seq, dn_winsize, up_seq, up_winsize, dn_ack,
set_timeout ? "SET " : "", timeout_ms, respond, unpacked[8]);
if (set_timeout) {
/* update user's query timeout if timeout flag set */
users[userid].dns_timeout = timeout;
/* if timeout is 0, we do not enable lazy mode but it is effectively the same */
int newlazy = !(timeout_ms == 0);
if (newlazy != users[userid].lazy)
DEBUG(2, "User %d: not setting lazymode to %d with timeout %u",
userid, newlazy, timeout_ms);
}
qmem_append(userid, q);
if (respond) {
/* ping handshake - set windowsizes etc, respond NOW using this query
* NOTE: not added to qmem */
DEBUG(2, "PING HANDSHAKE set windowsizes (old/new) up: %d/%d, dn: %d/%d",
users[userid].outgoing->windowsize, dn_winsize, users[userid].incoming->windowsize, up_winsize);
users[userid].outgoing->windowsize = dn_winsize;
users[userid].incoming->windowsize = up_winsize;
send_data_or_ping(dns_fds, userid, q, 1, 1);
} else {
qmem_append(userid, q);
return;
}
if ((unpacked[8] >> 3) & 1) {
/* update user's query timeout if timeout flag set */
users[userid].dns_timeout = timeout;
if (timeout_ms == 0) {
/* immediate mode is implied by server timeout of 0 */
users[userid].lazy = 0;
}
}
DEBUG(3, "PING pkt from user %d, down %d/%d, up %d/%d, ACK %d, set timeout %u ms (flags %02X)",
userid, dn_seq, dn_winsize, up_seq, up_winsize, dn_ack, timeout_ms, unpacked[8]);
user_process_incoming_data(tun_fd, dns_fds, userid, dn_ack);
/* if respond flag not set, query waits in qmem and is used later */
@ -1724,16 +1666,9 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
return; /* illegal IP */
}
#ifdef DNSCACHE_LEN
/* Check if cached */
if (answer_from_dnscache(dns_fd, userid, q))
return;
#endif
#ifdef QMEM_LEN
/* Check if cached */
if (!qmem_is_cached(dns_fd, userid, q))
qmem_append(userid, q);
#endif
/* Decode upstream data header - see docs/proto_XXXXXXXX.txt */
/* First byte (after userid) = CMC (ignored); skip 2 bytes */
len = sizeof(unpacked);
@ -1758,7 +1693,9 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
/* Shouldn't normally happen; will always be reset after sending a packet. */
DEBUG(1, "[WARNING] next_upstream_ack == %d for user %d.", users[userid].next_upstream_ack, userid);
}
users[userid].next_upstream_ack = window_process_incoming_fragment(users[userid].incoming, &f);
window_process_incoming_fragment(users[userid].incoming, &f);
users[userid].next_upstream_ack = f.seqID;
user_process_incoming_data(tun_fd, dns_fds, userid, f.ack_other);

View File

@ -38,13 +38,14 @@
#include <syslog.h>
#endif
#define DNSCACHE_LEN 10
/* Undefine to disable. Should be less than 18; also see comments in iodined.c */
/* Max number of incoming queries to hold at one time (recommended to be same as windowsize)
* Memory = USERS * (sizeof(struct query_buffer) + sizeof(query) * QMEM_LEN) */
#define QMEM_LEN 24
#define USE_DNSCACHE
/* QMEM entries contain additional space for DNS responses.
* Undefine to disable. */
/* Number of fragments in outgoing buffer.
* Mem usage: USERS * (MAX_FRAGLEN * OUTFRAGBUF_LEN + sizeof(struct window_buffer)) */
#define OUTFRAGBUF_LEN 64
@ -87,8 +88,21 @@ typedef enum {
VERSION_FULL
} version_ack_t;
struct query_buffer {
struct query queries[QMEM_LEN];
struct query_answer {
uint8_t data[4096];
size_t len;
};
struct qmem_query {
struct query q;
#ifdef USE_DNSCACHE
struct query_answer a;
#endif
};
/* Struct used for QMEM + DNS cache */
struct qmem_buffer {
struct qmem_query queries[QMEM_LEN];
size_t start_pending; /* index of first "pending" query (ie. no response yet) */
size_t start; /* index of first stored/pending query */
size_t end; /* index of space after last stored/pending query */

View File

@ -132,10 +132,19 @@ all_users_waiting_to_send()
outgoing buffer, so that sending back-to-back is possible
without going through another select loop. */
{
for (int i = 0; i < usercount; i++)
if (user_active(i))
int numactive = 0;
for (int i = 0; i < usercount; i++) {
if (user_active(i)) {
if (users[i].outgoing->length - users[i].outgoing->numitems > 8)
return 0;
numactive ++;
}
}
/* no users waiting if there are no users */
if (numactive == 0)
return 0;
return 1;
}

View File

@ -47,15 +47,7 @@ struct tun_user {
int fragsize;
enum connection conn;
int lazy;
#ifdef QMEM_LEN
struct query_buffer qmem;
#endif
#ifdef DNSCACHE_LEN
struct query dnscache_q[DNSCACHE_LEN];
char dnscache_answer[DNSCACHE_LEN][4096];
int dnscache_answerlen[DNSCACHE_LEN];
int dnscache_lastfilled;
#endif
struct qmem_buffer qmem;
};
extern struct tun_user *users;