diff --git a/app/proxyman/inbound/worker.go b/app/proxyman/inbound/worker.go index 0d4ae494..4c561977 100644 --- a/app/proxyman/inbound/worker.go +++ b/app/proxyman/inbound/worker.go @@ -279,7 +279,8 @@ func (w *udpWorker) callback(b *buf.Buffer, source net.Destination, originalDest src: source, } if originalDest.IsValid() { - id.dest = originalDest + //id.dest = originalDest + b.UDP = &originalDest } conn, existing := w.getConnection(id) diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index ccc0f974..ce273c5d 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -173,26 +173,30 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in if d.sockopt != nil { sockopt.Mark = d.sockopt.Mark } - tConn, err := internet.DialSystem(ctx, net.DestinationFromAddr(conn.RemoteAddr()), sockopt) + to := net.DestinationFromAddr(conn.RemoteAddr()) + tConn, err := internet.DialSystem(ctx, to, sockopt) if err != nil { return err } - defer tConn.Close() - - writer = &buf.SequentialWriter{Writer: tConn} - tReader := buf.NewPacketReader(tConn) - requestCount++ - tproxyRequest = func() error { - defer func() { - if atomic.AddInt32(&requestCount, -1) == 0 { - timer.SetTimeout(plcy.Timeouts.DownlinkOnly) + writer = NewPacketWriter(tConn, &dest, ctx, &to, sockopt) + defer writer.(*PacketWriter).Close() + /* + defer tConn.Close() + writer = &buf.SequentialWriter{Writer: tConn} + tReader := buf.NewPacketReader(tConn) + requestCount++ + tproxyRequest = func() error { + defer func() { + if atomic.AddInt32(&requestCount, -1) == 0 { + timer.SetTimeout(plcy.Timeouts.DownlinkOnly) + } + }() + if err := buf.Copy(tReader, link.Writer, buf.UpdateActivity(timer)); err != nil { + return newError("failed to transport request (TPROXY conn)").Base(err) } - }() - if err := buf.Copy(tReader, link.Writer, buf.UpdateActivity(timer)); err != nil { - return newError("failed to transport request (TPROXY conn)").Base(err) + return nil } - return nil - } + */ } } @@ -215,3 +219,65 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in return nil } + +func NewPacketWriter(conn net.Conn, d *net.Destination, ctx context.Context, to *net.Destination, sockopt *internet.SocketConfig) buf.Writer { + writer := &PacketWriter{ + conn: conn, + conns: make(map[net.Destination]net.Conn), + ctx: ctx, + to: to, + sockopt: sockopt, + } + writer.conns[*d] = conn + return writer +} + +type PacketWriter struct { + conn net.Conn + conns map[net.Destination]net.Conn + ctx context.Context + to *net.Destination + sockopt *internet.SocketConfig +} + +func (w *PacketWriter) WriteMultiBuffer(mb buf.MultiBuffer) error { + for { + mb2, b := buf.SplitFirst(mb) + mb = mb2 + if b == nil { + break + } + var err error + if b.UDP != nil && b.UDP.Address.Family().IsIP() { + conn := w.conns[*b.UDP] + if conn == nil { + w.sockopt.BindAddress = b.UDP.Address.IP() + w.sockopt.BindPort = uint32(b.UDP.Port) + conn, _ = internet.DialSystem(w.ctx, *w.to, w.sockopt) + if conn == nil { + b.Release() + continue + } + w.conns[*b.UDP] = conn + } + _, err = conn.Write(b.Bytes()) + } else { + _, err = w.conn.Write(b.Bytes()) + } + b.Release() + if err != nil { + buf.ReleaseMulti(mb) + return err + } + } + return nil +} + +func (w *PacketWriter) Close() error { + for _, conn := range w.conns { + if conn != nil { + conn.Close() + } + } + return nil +} diff --git a/proxy/socks/client.go b/proxy/socks/client.go index e409cc75..77c306b5 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -138,11 +138,12 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter defer udpConn.Close() requestFunc = func() error { defer timer.SetTimeout(p.Timeouts.DownlinkOnly) - return buf.Copy(link.Reader, &buf.SequentialWriter{Writer: NewUDPWriter(request, udpConn)}, buf.UpdateActivity(timer)) + writer := &UDPWriter{Writer: udpConn, Request: request} + return buf.Copy(link.Reader, writer, buf.UpdateActivity(timer)) } responseFunc = func() error { defer timer.SetTimeout(p.Timeouts.UplinkOnly) - reader := &UDPReader{reader: udpConn} + reader := &UDPReader{Reader: udpConn} return buf.Copy(reader, link.Writer, buf.UpdateActivity(timer)) } } diff --git a/proxy/socks/protocol.go b/proxy/socks/protocol.go index b32166b5..e35f9ad6 100644 --- a/proxy/socks/protocol.go +++ b/proxy/socks/protocol.go @@ -360,47 +360,59 @@ func EncodeUDPPacket(request *protocol.RequestHeader, data []byte) (*buf.Buffer, } type UDPReader struct { - reader io.Reader -} - -func NewUDPReader(reader io.Reader) *UDPReader { - return &UDPReader{reader: reader} + Reader io.Reader } func (r *UDPReader) ReadMultiBuffer() (buf.MultiBuffer, error) { - b := buf.New() - if _, err := b.ReadFrom(r.reader); err != nil { + buffer := buf.New() + _, err := buffer.ReadFrom(r.Reader) + if err != nil { + buffer.Release() return nil, err } - if _, err := DecodeUDPPacket(b); err != nil { + u, err := DecodeUDPPacket(buffer) + if err != nil { + buffer.Release() return nil, err } - return buf.MultiBuffer{b}, nil + dest := u.Destination() + buffer.UDP = &dest + return buf.MultiBuffer{buffer}, nil } type UDPWriter struct { - request *protocol.RequestHeader - writer io.Writer + Writer io.Writer + Request *protocol.RequestHeader } -func NewUDPWriter(request *protocol.RequestHeader, writer io.Writer) *UDPWriter { - return &UDPWriter{ - request: request, - writer: writer, +func (w *UDPWriter) WriteMultiBuffer(mb buf.MultiBuffer) error { + for { + mb2, b := buf.SplitFirst(mb) + mb = mb2 + if b == nil { + break + } + request := w.Request + if b.UDP != nil { + request = &protocol.RequestHeader{ + Address: b.UDP.Address, + Port: b.UDP.Port, + } + } + packet, err := EncodeUDPPacket(request, b.Bytes()) + b.Release() + if err != nil { + buf.ReleaseMulti(mb) + return err + } + _, err = w.Writer.Write(packet.Bytes()) + packet.Release() + if err != nil { + buf.ReleaseMulti(mb) + return err + } } -} - -// Write implements io.Writer. -func (w *UDPWriter) Write(b []byte) (int, error) { - eb, err := EncodeUDPPacket(w.request, b) - if err != nil { - return 0, err - } - defer eb.Release() - if _, err := w.writer.Write(eb.Bytes()); err != nil { - return 0, err - } - return len(b), nil + return nil } func ClientHandshake(request *protocol.RequestHeader, reader io.Reader, writer io.Writer) (*protocol.RequestHeader, error) { diff --git a/proxy/socks/protocol_test.go b/proxy/socks/protocol_test.go index 006fb7f1..c201ef7d 100644 --- a/proxy/socks/protocol_test.go +++ b/proxy/socks/protocol_test.go @@ -20,14 +20,14 @@ func TestUDPEncoding(t *testing.T) { Address: net.IPAddress([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6}), Port: 1024, } - writer := &buf.SequentialWriter{Writer: NewUDPWriter(request, b)} + writer := &UDPWriter{Writer: b, Request: request} content := []byte{'a'} payload := buf.New() payload.Write(content) common.Must(writer.WriteMultiBuffer(buf.MultiBuffer{payload})) - reader := NewUDPReader(b) + reader := &UDPReader{Reader: b} decodedPayload, err := reader.ReadMultiBuffer() common.Must(err)