From 3348a533a38e177255c5a9031abfebd652ee43a4 Mon Sep 17 00:00:00 2001 From: Erik Ekman Date: Sun, 20 Sep 2009 21:10:41 +0000 Subject: [PATCH] #76 merge server code --- src/iodined.c | 905 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 742 insertions(+), 163 deletions(-) diff --git a/src/iodined.c b/src/iodined.c index 8babe62..d4314cf 100644 --- a/src/iodined.c +++ b/src/iodined.c @@ -87,7 +87,7 @@ static char *__progname; static int read_dns(int, int, struct query *); static void write_dns(int, struct query *, char *, int, char); -static void handle_full_packet(int, int); +static void handle_full_packet(int, int, int); static void sigint(int sig) @@ -156,6 +156,250 @@ send_raw(int fd, char *buf, int buflen, int user, int cmd, struct query *q) } +static void +start_new_outpacket(int userid, char *data, int datalen) +/* Copies data to .outpacket and resets all counters. + data is expected to be compressed already. */ +{ + datalen = MIN(datalen, sizeof(users[userid].outpacket.data)); + memcpy(users[userid].outpacket.data, data, datalen); + users[userid].outpacket.len = datalen; + users[userid].outpacket.offset = 0; + users[userid].outpacket.sentlen = 0; + users[userid].outpacket.seqno = (users[userid].outpacket.seqno + 1) & 7; + users[userid].outpacket.fragment = 0; + users[userid].outfragresent = 0; +} + +#ifdef OUTPACKETQ_LEN + +static int +save_to_outpacketq(int userid, char *data, int datalen) +/* Find space in outpacket-queue and store data (expected compressed already). + Returns: 1 = okay, 0 = no space. */ +{ + int fill; + + if (users[userid].outpacketq_filled >= OUTPACKETQ_LEN) + /* no space */ + return 0; + + fill = users[userid].outpacketq_nexttouse + + users[userid].outpacketq_filled; + if (fill >= OUTPACKETQ_LEN) + fill -= OUTPACKETQ_LEN; + + datalen = MIN(datalen, sizeof(users[userid].outpacketq[fill].data)); + memcpy(users[userid].outpacketq[fill].data, data, datalen); + users[userid].outpacketq[fill].len = datalen; + + users[userid].outpacketq_filled++; + + if (debug >= 3) + fprintf(stderr, " Qstore, now %d\n", + users[userid].outpacketq_filled); + + return 1; +} + +static int +get_from_outpacketq(int userid) +/* Starts new outpacket from queue, if any. + Returns: 1 = okay, 0 = no packets were waiting. */ +{ + int use; + + if (users[userid].outpacketq_filled <= 0) + /* no packets */ + return 0; + + use = users[userid].outpacketq_nexttouse; + + start_new_outpacket(userid, users[userid].outpacketq[use].data, + users[userid].outpacketq[use].len); + + use++; + if (use >= OUTPACKETQ_LEN) + use = 0; + users[userid].outpacketq_nexttouse = use; + users[userid].outpacketq_filled--; + + if (debug >= 3) + fprintf(stderr, " Qget, now %d\n", + users[userid].outpacketq_filled); + + return 1; +} + +#endif /* OUTPACKETQ_LEN */ + +#ifdef DNSCACHE_LEN + +/* On the DNS cache: + + This cache is implemented to better handle the aggressively impatient DNS + servers that very quickly re-send requests when we choose to not + immediately answer them in lazy mode. This cache works much better than + pruning(=dropping) the improper requests, since the DNS server will + actually get an answer instead of silence. + + We normally use either CMC (ping) or seqno/frag (upstream data) to prevent + cache hits on in-between caching DNS servers. Also, the iodine client is + designed to mostly operate properly when cached results are returned. + Two cache-hit situations: + - Repeated DNS query when our ack got lost: has same seqno/frag and doesn't + have CMC; but the client will not have sent any new data or pings + in-between, so this is always cacheable. Even in lazy mode, since we send + the first answer to the actual DNS query only on receipt of the first + client retransmit. + - Identical second+ fragment of mod-8 packets ago, same seqno/frag and no + TCP counter in those fragments to tell them apart. This is _not_ + cachable, so our cache length should never exceed 7 packets. +*/ + +static void +save_to_dnscache(int userid, struct query *q, char *answer, int answerlen) +/* Store answer in our little DNS cache. */ +{ + int fill; + + if (answerlen > sizeof(users[userid].dnscache_answer[fill])) + return; /* can't store this */ + + fill = users[userid].dnscache_lastfilled + 1; + if (fill >= DNSCACHE_LEN) + fill = 0; + + memcpy(&(users[userid].dnscache_q[fill]), q, sizeof(struct query)); + memcpy(users[userid].dnscache_answer[fill], answer, answerlen); + users[userid].dnscache_answerlen[fill] = answerlen; + + users[userid].dnscache_lastfilled = fill; +} + +static int +answer_from_dnscache(int dns_fd, int userid, struct query *q) +/* Checks cache and sends repeated answer if we alreay saw this query recently. + Returns: 1 = answer sent, drop this query, 0 = no answer sent, this is + a new query. */ +{ + int i; + int use; + + for (i = 0; i < DNSCACHE_LEN ; i++) { + /* Try cache most-recent-first */ + use = users[userid].dnscache_lastfilled - i; + if (use < 0) + use += DNSCACHE_LEN; + + if (users[userid].dnscache_q[use].id == 0) + continue; + if (users[userid].dnscache_answerlen[use] <= 0) + continue; + + if (users[userid].dnscache_q[use].type != q->type || + strcmp(users[userid].dnscache_q[use].name, q->name)) + continue; + + /* okay, match */ + write_dns(dns_fd, q, users[userid].dnscache_answer[use], + users[userid].dnscache_answerlen[use], + users[userid].downenc); + + q->id = 0; /* this query was used */ + return 1; + } + + /* here only when no match found */ + return 0; +} + +#endif /* DNSCACHE_LEN */ + +static int +send_chunk_or_dataless(int dns_fd, int userid, struct query *q) +/* Sends current fragment to user, or dataless packet if there is no + current fragment available (-> normal "quiet" ping reply). + Does not update anything, except: + - discards q always (query is used) + - forgets entire users[userid].outpacket if it was sent in one go, + and then tries to get new packet from outpacket-queue + Returns: 1 = can call us again immediately, new packet from queue; + 0 = don't call us again for now. +*/ +{ + char pkt[4096]; + int datalen = 0; + int last = 0; + + /* If re-sent too many times, drop entire packet */ + if (users[userid].outpacket.len > 0 && + users[userid].outfragresent > 5) { + users[userid].outpacket.len = 0; + users[userid].outpacket.offset = 0; + users[userid].outpacket.sentlen = 0; + users[userid].outfragresent = 0; + +#ifdef OUTPACKETQ_LEN + /* Maybe more in queue, use immediately */ + get_from_outpacketq(userid); +#endif + } + + if (users[userid].outpacket.len > 0) { + datalen = MIN(users[userid].fragsize, users[userid].outpacket.len - users[userid].outpacket.offset); + datalen = MIN(datalen, sizeof(pkt)-2); + + memcpy(&pkt[2], users[userid].outpacket.data + users[userid].outpacket.offset, datalen); + users[userid].outpacket.sentlen = datalen; + last = (users[userid].outpacket.len == users[userid].outpacket.offset + datalen); + + users[userid].outfragresent++; + } + + /* Build downstream data header (see doc/proto_xxxxxxxx.txt) */ + + /* First byte is 1 bit compression flag, 3 bits upstream seqno, 4 bits upstream fragment */ + pkt[0] = (1<<7) | ((users[userid].inpacket.seqno & 7) << 4) | + (users[userid].inpacket.fragment & 15); + /* Second byte is 3 bits downstream seqno, 4 bits downstream fragment, 1 bit last flag */ + pkt[1] = ((users[userid].outpacket.seqno & 7) << 5) | + ((users[userid].outpacket.fragment & 15) << 1) | (last & 1); + + if (debug >= 1) { + fprintf(stderr, "OUT pkt seq# %d, frag %d (last=%d), offset %d, fragsize %d, total %d, to user %d\n", + users[userid].outpacket.seqno & 7, users[userid].outpacket.fragment & 15, + last, users[userid].outpacket.offset, datalen, users[userid].outpacket.len, userid); + } + write_dns(dns_fd, q, pkt, datalen + 2, users[userid].downenc); + +#ifdef DNSCACHE_LEN + save_to_dnscache(userid, q, pkt, datalen + 2); +#endif + + q->id = 0; /* this query is used */ + /* .iddupe is _not_ reset on purpose */ + + if (datalen > 0 && datalen == users[userid].outpacket.len) { + /* Whole packet was sent in one chunk, dont wait for ack */ + users[userid].outpacket.len = 0; + users[userid].outpacket.offset = 0; + users[userid].outpacket.sentlen = 0; + users[userid].outfragresent = 0; + +#ifdef OUTPACKETQ_LEN + /* Maybe more in queue, prepare for next time */ + if (get_from_outpacketq(userid) == 1) { + if (debug >= 3) + fprintf(stderr, " Chunk & fromqueue: callagain\n"); + return 1; /* call us again */ + } +#endif + } + + return 0; /* don't call us again */ +} + static int tunnel_tun(int tun_fd, int dns_fd) { @@ -179,18 +423,25 @@ tunnel_tun(int tun_fd, int dns_fd) compress2((uint8_t*)out, &outlen, (uint8_t*)in, read, 9); if (users[userid].conn == CONN_DNS_NULL) { - /* if another packet is queued, throw away this one. TODO build queue */ - if (users[userid].outpacket.len == 0) { - memcpy(users[userid].outpacket.data, out, outlen); - users[userid].outpacket.len = outlen; - users[userid].outpacket.offset = 0; - users[userid].outpacket.sentlen = 0; - users[userid].outpacket.seqno = (++users[userid].outpacket.seqno & 7); - users[userid].outpacket.fragment = 0; - return outlen; - } else { +#ifdef OUTPACKETQ_LEN + /* If a packet is being sent, try storing the new one in the queue. + If the queue is full, drop the packet. TCP will hopefully notice + and reduce the packet rate. */ + if (users[userid].outpacket.len > 0) { + save_to_outpacketq(userid, out, outlen); return 0; } +#endif + + start_new_outpacket(userid, out, outlen); + + /* Start sending immediately if query is waiting */ + if (users[userid].q_sendrealsoon.id != 0) + send_chunk_or_dataless(dns_fd, userid, &users[userid].q_sendrealsoon); + else if (users[userid].q.id != 0) + send_chunk_or_dataless(dns_fd, userid, &users[userid].q); + + return outlen; } else { /* CONN_RAW_UDP */ send_raw(dns_fd, out, outlen, userid, RAW_HDR_CMD_DATA, &users[userid].q); return outlen; @@ -230,93 +481,40 @@ send_version_response(int fd, version_ack_t ack, uint32_t payload, int userid, s } static void -send_chunk(int dns_fd, int userid) { - char pkt[4096]; - int datalen; - int last; +process_downstream_ack(int userid, int down_seq, int down_frag) +/* Process acks from downstream fragments. + After this, .offset and .fragment are updated (if ack correct), + or .len is set to zero when all is done. +*/ +{ + if (users[userid].outpacket.len <= 0) + /* No packet to apply acks to */ + return; - datalen = MIN(users[userid].fragsize, users[userid].outpacket.len - users[userid].outpacket.offset); + if (users[userid].outpacket.seqno != down_seq || + users[userid].outpacket.fragment != down_frag) + /* Not the ack we're waiting for; probably duplicate of old + ack, happens a lot with ping packets */ + return; - if (datalen && users[userid].outpacket.sentlen > 0 && - ( - users[userid].outpacket.seqno != users[userid].out_acked_seqno || - users[userid].outpacket.fragment != users[userid].out_acked_fragment - ) - ) { + /* Received proper ack */ + users[userid].outpacket.offset += users[userid].outpacket.sentlen; + users[userid].outpacket.sentlen = 0; + users[userid].outpacket.fragment++; + users[userid].outfragresent = 0; - /* Still waiting on latest ack, send nothing */ - datalen = 0; - last = 0; - /* TODO : count down and discard packet if no acks arrive within X queries */ - } else { - memcpy(&pkt[2], &users[userid].outpacket.data[users[userid].outpacket.offset], datalen); - users[userid].outpacket.sentlen = datalen; - last = (users[userid].outpacket.len == users[userid].outpacket.offset + users[userid].outpacket.sentlen); - - /* Increase fragment# when sending data with offset */ - if (users[userid].outpacket.offset && datalen) - users[userid].outpacket.fragment++; - } - - /* Build downstream data header (see doc/proto_xxxxxxxx.txt) */ - - /* First byte is 1 bit compression flag, 3 bits upstream seqno, 4 bits upstream fragment */ - pkt[0] = (1<<7) | ((users[userid].inpacket.seqno & 7) << 4) | (users[userid].inpacket.fragment & 15); - /* Second byte is 3 bits downstream seqno, 4 bits downstream fragment, 1 bit last flag */ - pkt[1] = ((users[userid].outpacket.seqno & 7) << 5) | - ((users[userid].outpacket.fragment & 15) << 1) | (last & 1); - - if (debug >= 1) { - fprintf(stderr, "OUT pkt seq# %d, frag %d (last=%d), offset %d, fragsize %d, total %d, to user %d\n", - users[userid].outpacket.seqno & 7, users[userid].outpacket.fragment & 15, - last, users[userid].outpacket.offset, datalen, users[userid].outpacket.len, userid); - } - write_dns(dns_fd, &users[userid].q, pkt, datalen + 2, users[userid].downenc); - users[userid].q.id = 0; - - if (users[userid].outpacket.len > 0 && - users[userid].outpacket.len == users[userid].outpacket.sentlen) { - - /* Whole packet was sent in one chunk, dont wait for ack */ + /* Is packet done? */ + if (users[userid].outpacket.offset == users[userid].outpacket.len) { users[userid].outpacket.len = 0; users[userid].outpacket.offset = 0; - users[userid].outpacket.sentlen = 0; - } -} + users[userid].outpacket.fragment--; /* unneeded ++ above */ + /* last seqno/frag are always returned on pings */ + /* users[userid].outfragresent = 0; already above */ -static void -update_downstream_seqno(int dns_fd, int userid, int down_seq, int down_frag) -{ - /* If we just read a new packet from tun we have not sent a fragment of, just send it */ - if (users[userid].outpacket.len > 0 && users[userid].outpacket.sentlen == 0) { - send_chunk(dns_fd, userid); - return; - } - - /* otherwise, check if we received ack on a fragment and can send next */ - if (users[userid].outpacket.len > 0 && - users[userid].outpacket.seqno == down_seq && users[userid].outpacket.fragment == down_frag) { - - if (down_seq != users[userid].out_acked_seqno || down_frag != users[userid].out_acked_fragment) { - /* Received ACK on downstream fragment */ - users[userid].outpacket.offset += users[userid].outpacket.sentlen; - users[userid].outpacket.sentlen = 0; - - /* Is packet done? */ - if (users[userid].outpacket.offset == users[userid].outpacket.len) { - users[userid].outpacket.len = 0; - users[userid].outpacket.offset = 0; - users[userid].outpacket.sentlen = 0; - } - - users[userid].out_acked_seqno = down_seq; - users[userid].out_acked_fragment = down_frag; - - /* Send reply if waiting */ - if (users[userid].outpacket.len > 0) { - send_chunk(dns_fd, userid); - } - } +#ifdef OUTPACKETQ_LEN + /* Possibly get new packet from queue */ + get_from_outpacketq(userid); +#endif } } @@ -334,6 +532,10 @@ handle_null_request(int tun_fd, int dns_fd, struct query *q, int domain_len) userid = -1; + /* Everything here needs at least two chars in the name */ + if (domain_len < 2) + return; + memcpy(in, q->name, MIN(domain_len, sizeof(in))); if(in[0] == 'V' || in[0] == 'v') { @@ -366,17 +568,38 @@ handle_null_request(int tun_fd, int dns_fd, struct query *q, int domain_len) syslog(LOG_INFO, "accepted version for user #%d from %s", userid, inet_ntoa(tempin->sin_addr)); users[userid].q.id = 0; + users[userid].q.iddupe = 0; + users[userid].q_prev.id = 0; + users[userid].q_prev.iddupe = 0; + users[userid].q_sendrealsoon.id = 0; + users[userid].q_sendrealsoon_new = 0; users[userid].outpacket.len = 0; users[userid].outpacket.offset = 0; users[userid].outpacket.sentlen = 0; users[userid].outpacket.seqno = 0; users[userid].outpacket.fragment = 0; + users[userid].outfragresent = 0; users[userid].inpacket.len = 0; users[userid].inpacket.offset = 0; users[userid].inpacket.seqno = 0; users[userid].inpacket.fragment = 0; users[userid].fragsize = 100; /* very safe */ users[userid].conn = CONN_DNS_NULL; + users[userid].lazy = 0; +#ifdef OUTPACKETQ_LEN + users[userid].outpacketq_nexttouse = 0; + users[userid].outpacketq_filled = 0; +#endif +#ifdef DNSCACHE_LEN + { + int i; + for (i = 0; i < DNSCACHE_LEN; i++) { + users[userid].dnscache_q[i].id = 0; + users[userid].dnscache_answerlen[i] = 0; + } + } + users[userid].dnscache_lastfilled = 0; +#endif } else { /* No space for another user */ send_version_response(dns_fd, VERSION_FULL, created_users, 0, q); @@ -523,6 +746,16 @@ handle_null_request(int tun_fd, int dns_fd, struct query *q, int domain_len) users[userid].downenc = 'R'; write_dns(dns_fd, q, "Raw", 3, users[userid].downenc); break; + case 'L': + case 'l': + users[userid].lazy = 1; + write_dns(dns_fd, q, "Lazy", 4, users[userid].downenc); + break; + case 'I': + case 'i': + users[userid].lazy = 0; + write_dns(dns_fd, q, "Immediate", 9, users[userid].downenc); + break; default: write_dns(dns_fd, q, "BADCODEC", 8, users[userid].downenc); break; @@ -578,36 +811,166 @@ handle_null_request(int tun_fd, int dns_fd, struct query *q, int domain_len) } else if(in[0] == 'P' || in[0] == 'p') { int dn_seq; int dn_frag; - + int didsend = 0; + + /* We can't handle id=0, that's "no packet" to us. So drop + request completely. Note that DNS servers rewrite the id. + We'll drop 1 in 64k times. If DNS server retransmits with + different id, then all okay. + Else client won't retransmit, and we'll just keep the + previous ping in cache, no problem either. */ + if (q->id == 0) + return; + read = unpack_data(unpacked, sizeof(unpacked), &(in[1]), domain_len - 1, b32); + if (read < 4) + return; + /* Ping packet, store userid */ userid = unpacked[0]; if (check_user_and_ip(userid, q) != 0) { write_dns(dns_fd, q, "BADIP", 5, 'T'); return; /* illegal id */ } + +#ifdef DNSCACHE_LEN + /* Check if cached */ + if (answer_from_dnscache(dns_fd, userid, q)) { + /* Answer sent. But if this is our currently waiting + request in the queue, invalidate now since we can't + be sure that our coming new answer will ever reach + client. Happens on 3+ retransmits in the "lost pings + problem" with agressive DNS server. + */ + if (users[userid].q.id != 0 && + q->type == users[userid].q.type && + !strcmp(q->name, users[userid].q.name)) + users[userid].q.id = 0; + return; + } +#endif + + /* Dupe pruning */ + if (users[userid].q.iddupe != 0 && + q->type == users[userid].q.type && + !strcmp(q->name, users[userid].q.name) && + users[userid].lazy) { + /* We have this ping already. Aggressively impatient + DNS servers resend queries with _different_ id. + But hostname check is sufficient, includes CMC. + Just drop this ping. + If we already answered it (e.g. data available some + milliseconds ago), DNS server should have noticed + by now (race condition, happens rarely). + If we didn't answer yet, we'll do later (to the + first id, thank you very much). */ + if (debug >= 2) { + fprintf(stderr, "PING pkt from user %d = dupe from impatient DNS server, ignoring\n", + userid); + } + return; + } + + if (users[userid].q_prev.iddupe != 0 && + q->type == users[userid].q_prev.type && + !strcmp(q->name, users[userid].q_prev.name) && + users[userid].lazy) { + /* Okay, even older ping that we already saw + and probably answered just milliseconds ago. + This is a race condition that agressive DNS servers + actually train into; happens quite often. + Just drop this new version. */ + /* If using dnscache, this new query probably got a + cached answer already, and this shouldn't trigger. */ + if (debug >= 2) { + fprintf(stderr, "PING pkt from user %d = dupe (previous) from impatient DNS server, ignoring\n", + userid); + } + return; + } + + if (users[userid].q_sendrealsoon.id != 0 && + q->type == users[userid].q_sendrealsoon.type && + !strcmp(q->name, users[userid].q_sendrealsoon.name)) { + /* Outer select loop will send answer immediately. */ + if (debug >= 2) { + fprintf(stderr, "PING pkt from user %d = dupe from impatient DNS server, ignoring\n", + userid); + } + return; + } - if (debug >= 1) { - fprintf(stderr, "PING pkt from user %d\n", userid); - } - - if (users[userid].q.id != 0) { - /* Send reply on earlier query before overwriting */ - send_chunk(dns_fd, userid); - } - dn_seq = unpacked[1] >> 4; dn_frag = unpacked[1] & 15; + + if (debug >= 1) { + fprintf(stderr, "PING pkt from user %d, ack for downstream %d/%d\n", + userid, dn_seq, dn_frag); + } + + process_downstream_ack(userid, dn_seq, dn_frag); + + if (debug >= 3) { + fprintf(stderr, "PINGret (if any) will ack upstream %d/%d\n", + users[userid].inpacket.seqno, users[userid].inpacket.fragment); + } + + /* If there is a query that must be returned real soon, do it. + May contain new downstream data if the ping had a new ack. + Otherwise, may also be re-sending old data. */ + if (users[userid].q_sendrealsoon.id != 0) { + send_chunk_or_dataless(dns_fd, userid, &users[userid].q_sendrealsoon); + } + + /* We need to store a new query, so if there still is an + earlier query waiting, always send a reply to finish it. + May contain new downstream data if the ping had a new ack. + Otherwise, may also be re-sending old data. + (This is duplicate data if we had q_sendrealsoon above.) */ + if (users[userid].q.id != 0) { + didsend = 1; + if (send_chunk_or_dataless(dns_fd, userid, &users[userid].q) == 1) + /* new packet from queue, send immediately */ + didsend = 0; + } + + /* Save previous query for dupe checking */ + memcpy(&(users[userid].q_prev), &(users[userid].q), + sizeof(struct query)); + + /* Save new query and time info */ memcpy(&(users[userid].q), q, sizeof(struct query)); users[userid].last_pkt = time(NULL); - /* Update seqno and maybe send immediate response packet */ - update_downstream_seqno(dns_fd, userid, dn_seq, dn_frag); + /* If anything waiting and we didn't already send above, send + it now. And always send immediately if we're not lazy + (then above won't have sent at all). */ + if ((!didsend && users[userid].outpacket.len > 0) || + !users[userid].lazy) + send_chunk_or_dataless(dns_fd, userid, &users[userid].q); + } else if((in[0] >= '0' && in[0] <= '9') || (in[0] >= 'a' && in[0] <= 'f') || (in[0] >= 'A' && in[0] <= 'F')) { + int up_seq, up_frag, dn_seq, dn_frag, lastfrag; + int upstream_ok = 1; + int didsend = 0; + int thisisdupe = 0; int code = -1; + /* Need 4char header + >=1 char data */ + if (domain_len < 5) + return; + + /* We can't handle id=0, that's "no packet" to us. So drop + request completely. Note that DNS servers rewrite the id. + We'll drop 1 in 64k times. If DNS server retransmits with + different id, then all okay. + Else client doesn't get our ack, and will retransmit in + 1 second. */ + if (q->id == 0) + return; + if ((in[0] >= '0' && in[0] <= '9')) code = in[0] - '0'; if ((in[0] >= 'a' && in[0] <= 'f')) @@ -619,47 +982,152 @@ handle_null_request(int tun_fd, int dns_fd, struct query *q, int domain_len) /* Check user and sending ip number */ if (check_user_and_ip(userid, q) != 0) { write_dns(dns_fd, q, "BADIP", 5, 'T'); - } else { - /* Decode data header */ - int up_seq = (b32_8to5(in[1]) >> 2) & 7; - int up_frag = ((b32_8to5(in[1]) & 3) << 2) | ((b32_8to5(in[2]) >> 3) & 3); - int dn_seq = (b32_8to5(in[2]) & 7); - int dn_frag = b32_8to5(in[3]) >> 1; - int lastfrag = b32_8to5(in[3]) & 1; + return; /* illegal id */ + } - if (users[userid].q.id != 0) { - /* Send reply on earlier query before overwriting */ - send_chunk(dns_fd, userid); - } +#ifdef DNSCACHE_LEN + /* Check if cached */ + if (answer_from_dnscache(dns_fd, userid, q)) { + /* Answer sent. But if this is our currently waiting + request in the queue, invalidate now since we can't + be sure that our coming new answer will ever reach + client. Happens on 3+ retransmits in the "lost pings + problem" with agressive DNS server. + */ + if (users[userid].q.id != 0 && + q->type == users[userid].q.type && + !strcmp(q->name, users[userid].q.name)) + users[userid].q.id = 0; + return; + } +#endif - /* Update query and time info for user */ - users[userid].last_pkt = time(NULL); - memcpy(&(users[userid].q), q, sizeof(struct query)); + /* Dupe pruning */ + if (users[userid].q.iddupe != 0 && + q->id == users[userid].q.iddupe && + q->type == users[userid].q.type && + !strcmp(q->name, users[userid].q.name) && + users[userid].lazy) { + /* We have this exact query already, with same id. + So this is surely a honest dupe. */ + if (debug >= 2) { + fprintf(stderr, "IN pkt from user %d = dupe from impatient DNS server, ignoring\n", + userid); + } + return; + } + /* Note: Upstream data packet retransmits have exact same + hostname, so can't reliably ignore the id here. + And that's not even needed because of send_ping_soon in + client. Nice. We still do need a queue-flush on data1-data1, + see thisisdupe. + But then there's the race condition in two variants: + data1 - ping - data1 + data1 - data2 - data1 + These are surely dupes, irrespective of id, because client + will only send ping/data2 when it has received our ack for + data1. (Okay, and ping/data2 should be dupe-pruned + themselves already...) + Draw pictures if you don't understand immediately. + */ + /* If using dnscache, the new data1 probably got a + cached answer already, and this shouldn't trigger. */ + if (users[userid].q.iddupe != 0 && + (q->type != users[userid].q.type || + strcmp(q->name, users[userid].q.name)) && + users[userid].q_prev.iddupe != 0 && + q->type == users[userid].q_prev.type && + !strcmp(q->name, users[userid].q_prev.name) && + users[userid].lazy) { + if (debug >= 2) { + fprintf(stderr, "IN pkt from user %d = dupe (previous) from impatient DNS server, ignoring\n", + userid); + } + return; + } + + if (users[userid].q_sendrealsoon.id != 0 && + q->type == users[userid].q_sendrealsoon.type && + !strcmp(q->name, users[userid].q_sendrealsoon.name)) { + /* Outer select loop will send answer immediately. */ + if (debug >= 2) { + fprintf(stderr, "IN pkt from user %d = dupe from impatient DNS server, ignoring\n", + userid); + } + return; + } + + /* We need to flush our queue on dupes, since our new answer + to the first query may/will be duplicated by DNS caches to + also answer the client's re-sent (=dupe) query. + (Caches take TTL=0 to mean: "good for current and earlier + queries") */ + if (users[userid].q.iddupe != 0 && + q->type == users[userid].q.type && + !strcmp(q->name, users[userid].q.name)) + thisisdupe = 1; - if (up_seq == users[userid].inpacket.seqno && - up_frag <= users[userid].inpacket.fragment) { - /* Got repeated old packet, skip it */ - if (debug >= 1) { - fprintf(stderr, "IN pkt seq# %d, frag %d, dropped duplicate\n", - up_seq, up_frag); - } - /* Update seqno and maybe send immediate response packet */ - update_downstream_seqno(dns_fd, userid, dn_seq, dn_frag); - return; - } - if (up_seq != users[userid].inpacket.seqno) { - /* New packet has arrived */ - users[userid].inpacket.seqno = up_seq; - users[userid].inpacket.len = 0; - users[userid].inpacket.offset = 0; + + /* Decode data header */ + up_seq = (b32_8to5(in[1]) >> 2) & 7; + up_frag = ((b32_8to5(in[1]) & 3) << 2) | ((b32_8to5(in[2]) >> 3) & 3); + dn_seq = (b32_8to5(in[2]) & 7); + dn_frag = b32_8to5(in[3]) >> 1; + lastfrag = b32_8to5(in[3]) & 1; + + process_downstream_ack(userid, dn_seq, dn_frag); + + if (up_seq == users[userid].inpacket.seqno && + up_frag <= users[userid].inpacket.fragment) { + /* Got repeated old packet _with data_, probably + because client didn't receive our ack. So re-send + our ack(+data) immediately to keep things flowing + fast. + If it's a _really_ old frag, it's a nameserver + that tries again, and sending our current (non- + matching) fragno won't be a problem. */ + if (debug >= 1) { + fprintf(stderr, "IN pkt seq# %d, frag %d, dropped duplicate frag\n", + up_seq, up_frag); } + upstream_ok = 0; + } + else if (up_seq != users[userid].inpacket.seqno && + recent_seqno(users[userid].inpacket.seqno, up_seq)) { + /* Duplicate of recent upstream data packet; probably + need to answer this to keep DNS server happy */ + if (debug >= 1) { + fprintf(stderr, "IN pkt seq# %d, frag %d, dropped duplicate recent seqno\n", + up_seq, up_frag); + } + upstream_ok = 0; + } + else if (up_seq != users[userid].inpacket.seqno) { + /* Really new packet has arrived, no recent duplicate */ + /* Forget any old packet, even if incomplete */ + users[userid].inpacket.seqno = up_seq; users[userid].inpacket.fragment = up_frag; + users[userid].inpacket.len = 0; + users[userid].inpacket.offset = 0; + } else { + /* seq is same, frag is higher; don't care about + missing fragments, TCP checksum will fail */ + users[userid].inpacket.fragment = up_frag; + } + if (debug >= 3) { + fprintf(stderr, "INpack with upstream %d/%d, we are going to ack upstream %d/%d\n", + up_seq, up_frag, + users[userid].inpacket.seqno, users[userid].inpacket.fragment); + } + + if (upstream_ok) { /* decode with this users encoding */ read = unpack_data(unpacked, sizeof(unpacked), &(in[4]), domain_len - 4, users[userid].encoder); /* copy to packet buffer, update length */ + read = MIN(read, sizeof(users[userid].inpacket.data) - users[userid].inpacket.offset); memcpy(users[userid].inpacket.data + users[userid].inpacket.offset, unpacked, read); users[userid].inpacket.len += read; users[userid].inpacket.offset += read; @@ -668,12 +1136,91 @@ handle_null_request(int tun_fd, int dns_fd, struct query *q, int domain_len) fprintf(stderr, "IN pkt seq# %d, frag %d (last=%d), fragsize %d, total %d, from user %d\n", up_seq, up_frag, lastfrag, read, users[userid].inpacket.len, userid); } + } - if (lastfrag & 1) { /* packet is complete */ - handle_full_packet(tun_fd, userid); + if (upstream_ok && lastfrag) { /* packet is complete */ + handle_full_packet(tun_fd, dns_fd, userid); + } + + /* If there is a query that must be returned real soon, do it. + Includes an ack of the just received upstream fragment, + may contain new data. */ + if (users[userid].q_sendrealsoon.id != 0) { + didsend = 1; + if (send_chunk_or_dataless(dns_fd, userid, &users[userid].q_sendrealsoon) == 1) + /* new packet from queue, send immediately */ + didsend = 0; + } + + /* If we already have an earlier query waiting, we need to + get rid of it to store the new query. + - If we have new data waiting and not yet sent above, + send immediately. + - If this wasn't the last upstream fragment, then we expect + more, so ack immediately if we didn't already. + - If we are in non-lazy mode, there should be no query + waiting, but if there is, send immediately. + - If we are flushing queue due to dupe, send immediately. + - In all other cases (mostly the last-fragment cases), + we can afford to wait just a tiny little while for the + TCP ack to arrive from our tun. Note that this works best + when there is only one client. + */ + if (users[userid].q.id != 0) { + if ((users[userid].outpacket.len > 0 && !didsend) || + (upstream_ok && !lastfrag && !didsend) || + (!upstream_ok && !didsend) || + !users[userid].lazy || + thisisdupe) { + didsend = 1; + if (send_chunk_or_dataless(dns_fd, userid, &users[userid].q) == 1) + /* new packet from queue, send immediately */ + didsend = 0; + } else { + memcpy(&(users[userid].q_sendrealsoon), + &(users[userid].q), + sizeof(struct query)); + users[userid].q_sendrealsoon_new = 1; + users[userid].q.id = 0; /* used */ + didsend = 1; + } + } + + /* Save previous query for dupe checking */ + memcpy(&(users[userid].q_prev), &(users[userid].q), + sizeof(struct query)); + + /* Save new query and time info */ + memcpy(&(users[userid].q), q, sizeof(struct query)); + users[userid].last_pkt = time(NULL); + + /* If we still need to ack this upstream frag, do it to keep + upstream flowing. + - If we have new data waiting and not yet sent above, + send immediately. + - If we are flushing queue due to dupe, send immediately. + - If this wasn't the last upstream fragment, then we expect + more, so ack immediately if we didn't already or are + in non-lazy mode. + - If this was the last fragment, and we didn't ack already + or are in non-lazy mode, send the ack after just a tiny + little while so that the TCP ack may have arrived from + our tun device. + - In all other cases, don't send anything now. + */ + if ((users[userid].outpacket.len > 0 && !didsend) + || thisisdupe) + send_chunk_or_dataless(dns_fd, userid, &users[userid].q); + else if (!didsend || !users[userid].lazy) { + if (upstream_ok && lastfrag) { + memcpy(&(users[userid].q_sendrealsoon), + &(users[userid].q), + sizeof(struct query)); + users[userid].q_sendrealsoon_new = 1; + users[userid].q.id = 0; /* used */ + } else { + send_chunk_or_dataless(dns_fd, userid, &users[userid].q); } - /* Update seqno and maybe send immediate response packet */ - update_downstream_seqno(dns_fd, userid, dn_seq, dn_frag); } } } @@ -848,16 +1395,30 @@ tunnel(int tun_fd, int dns_fd, int bind_fd) struct timeval tv; fd_set fds; int i; + int userid; while (running) { int maxfd; - if (users_waiting_on_reply()) { - tv.tv_sec = 0; - tv.tv_usec = 15000; - } else { - tv.tv_sec = 1; - tv.tv_usec = 0; - } + tv.tv_sec = 10; /* doesn't really matter */ + tv.tv_usec = 0; + + /* Adjust timeout if there is anything to send realsoon. + Clients won't be sending new data until we send our ack, + so don't keep them waiting long. This only triggers at + final upstream fragments, which is about once per eight + requests during heavy upstream traffic. + 20msec: ~8 packs every 1/50sec = ~400 DNSreq/sec, + or ~1200bytes every 1/50sec = ~0.5 Mbit/sec upstream */ + for (userid = 0; userid < USERS; userid++) { + if (users[userid].active && !users[userid].disabled && + users[userid].last_pkt + 60 > time(NULL)) { + users[userid].q_sendrealsoon_new = 0; + if (users[userid].q_sendrealsoon.id != 0) { + tv.tv_sec = 0; + tv.tv_usec = 20000; + } + } + } FD_ZERO(&fds); @@ -870,7 +1431,8 @@ tunnel(int tun_fd, int dns_fd, int bind_fd) maxfd = MAX(bind_fd, maxfd); } - /* TODO : use some kind of packet queue */ + /* Don't read from tun if no users can accept data anyway; + tun queue/TCP buffers are larger than our outpacket-queues */ if(!all_users_waiting_to_send()) { FD_SET(tun_fd, &fds); maxfd = MAX(tun_fd, maxfd); @@ -885,33 +1447,34 @@ tunnel(int tun_fd, int dns_fd, int bind_fd) } if (i==0) { - int j; - for (j = 0; j < USERS; j++) { - if (users[j].q.id != 0 && users[j].conn == CONN_DNS_NULL) { - send_chunk(dns_fd, j); - } - } + /* timeout; whatever; doesn't matter anymore */ } else { if (FD_ISSET(tun_fd, &fds)) { tunnel_tun(tun_fd, dns_fd); - continue; } if (FD_ISSET(dns_fd, &fds)) { tunnel_dns(tun_fd, dns_fd, bind_fd); - continue; } if (FD_ISSET(bind_fd, &fds)) { tunnel_bind(bind_fd, dns_fd); - continue; } } + + /* Send realsoon's if tun or dns didn't already */ + for (userid = 0; userid < USERS; userid++) + if (users[userid].active && !users[userid].disabled && + users[userid].last_pkt + 60 > time(NULL) && + users[userid].q_sendrealsoon.id != 0 && + users[userid].conn == CONN_DNS_NULL && + !users[userid].q_sendrealsoon_new) + send_chunk_or_dataless(dns_fd, userid, &users[userid].q_sendrealsoon); } return 0; } static void -handle_full_packet(int tun_fd, int userid) +handle_full_packet(int tun_fd, int dns_fd, int userid) { unsigned long outlen; char out[64*1024]; @@ -932,17 +1495,33 @@ handle_full_packet(int tun_fd, int userid) /* send the uncompressed packet to tun device */ write_tun(tun_fd, out, outlen); } else { - /* send the compressed packet to other client - * if another packet is queued, throw away this one. TODO build queue */ + /* send the compressed(!) packet to other client */ if (users[touser].outpacket.len == 0) { - memcpy(users[touser].outpacket.data, users[userid].inpacket.data, users[userid].inpacket.len); - users[touser].outpacket.len = users[userid].inpacket.len; + start_new_outpacket(touser, + users[userid].inpacket.data, + users[userid].inpacket.len); + + /* Start sending immediately if query is waiting */ + if (users[touser].q_sendrealsoon.id != 0) + send_chunk_or_dataless(dns_fd, touser, &users[touser].q_sendrealsoon); + else if (users[touser].q.id != 0) + send_chunk_or_dataless(dns_fd, touser, &users[userid].q); +#ifdef OUTPACKETQ_LEN + } else { + save_to_outpacketq(touser, + users[userid].inpacket.data, + users[userid].inpacket.len); +#endif } } } else { - fprintf(stderr, "Discarded data, uncompress() result: %d\n", ret); + if (debug >= 1) + fprintf(stderr, "Discarded data, uncompress() result: %d\n", ret); } - users[userid].inpacket.len = users[userid].inpacket.offset = 0; + + /* This packet is done */ + users[userid].inpacket.len = 0; + users[userid].inpacket.offset = 0; } static void @@ -997,7 +1576,7 @@ handle_raw_data(char *packet, int len, struct query *q, int dns_fd, int tun_fd, users[userid].inpacket.len, userid); } - handle_full_packet(tun_fd, userid); + handle_full_packet(tun_fd, dns_fd, userid); } static void