Collect stats and possible padding (previously Vision Reader Writer) to all traffic

This commit is contained in:
yuhan6665 2024-04-17 16:09:01 -04:00
parent 12b1f68335
commit 4ec189e6fe
4 changed files with 20 additions and 27 deletions

View File

@ -117,8 +117,8 @@ type TrafficState struct {
WriterSwitchToDirectCopy bool WriterSwitchToDirectCopy bool
} }
func NewTrafficState(userUUID []byte) *TrafficState { func NewTrafficState(userUUID []byte, flow string) *TrafficState {
return &TrafficState{ var state = TrafficState{
UserUUID: userUUID, UserUUID: userUUID,
StartTime: time.Time{}, StartTime: time.Time{},
ByteSent: 0, ByteSent: 0,
@ -131,15 +131,18 @@ func NewTrafficState(userUUID []byte) *TrafficState {
IsTLS: false, IsTLS: false,
Cipher: 0, Cipher: 0,
RemainingServerHello: -1, RemainingServerHello: -1,
WithinPaddingBuffers: true,
ReaderSwitchToDirectCopy: false, ReaderSwitchToDirectCopy: false,
RemainingCommand: -1, RemainingCommand: -1,
RemainingContent: -1, RemainingContent: -1,
RemainingPadding: -1, RemainingPadding: -1,
CurrentCommand: 0, CurrentCommand: 0,
IsPadding: true,
WriterSwitchToDirectCopy: false, WriterSwitchToDirectCopy: false,
};
if len(flow) > 0 {
state.IsPadding = true;
state.WithinPaddingBuffers = true;
} }
return &state
} }
// VisionReader is used to read xtls vision protocol // VisionReader is used to read xtls vision protocol
@ -181,7 +184,7 @@ func (w *VisionReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
} else if w.trafficState.CurrentCommand == 2 { } else if w.trafficState.CurrentCommand == 2 {
w.trafficState.WithinPaddingBuffers = false w.trafficState.WithinPaddingBuffers = false
w.trafficState.ReaderSwitchToDirectCopy = true w.trafficState.ReaderSwitchToDirectCopy = true
} else { } else if w.trafficState.CurrentCommand != -1 {
newError("XtlsRead unknown command ", w.trafficState.CurrentCommand, buffer.Len()).WriteToLog(session.ExportIDToError(w.ctx)) newError("XtlsRead unknown command ", w.trafficState.CurrentCommand, buffer.Len()).WriteToLog(session.ExportIDToError(w.ctx))
} }
} }

View File

@ -54,23 +54,21 @@ func DecodeHeaderAddons(buffer *buf.Buffer, reader io.Reader) (*Addons, error) {
} }
// EncodeBodyAddons returns a Writer that auto-encrypt content written by caller. // EncodeBodyAddons returns a Writer that auto-encrypt content written by caller.
func EncodeBodyAddons(writer io.Writer, request *protocol.RequestHeader, requestAddons *Addons, state *proxy.TrafficState, context context.Context) buf.Writer { func EncodeBodyAddons(writer buf.Writer, request *protocol.RequestHeader, requestAddons *Addons, state *proxy.TrafficState, context context.Context) buf.Writer {
w := proxy.NewVisionWriter(writer, state, context)
if request.Command == protocol.RequestCommandUDP { if request.Command == protocol.RequestCommandUDP {
return NewMultiLengthPacketWriter(writer.(buf.Writer)) return NewMultiLengthPacketWriter(w)
}
w := buf.NewWriter(writer)
if requestAddons.Flow == vless.XRV {
w = proxy.NewVisionWriter(w, state, context)
} }
return w return w
} }
// DecodeBodyAddons returns a Reader from which caller can fetch decrypted body. // DecodeBodyAddons returns a Reader from which caller can fetch decrypted body.
func DecodeBodyAddons(reader io.Reader, request *protocol.RequestHeader, addons *Addons) buf.Reader { func DecodeBodyAddons(reader io.Reader, request *protocol.RequestHeader, addons *Addons, state *proxy.TrafficState, context context.Context) buf.Reader {
r := proxy.NewVisionReader(buf.NewReader(reader), state, context)
if request.Command == protocol.RequestCommandUDP { if request.Command == protocol.RequestCommandUDP {
return NewLengthPacketReader(reader) return NewLengthPacketReader(&buf.BufferedReader{Reader: r})
} }
return buf.NewReader(reader) return r
} }
func NewMultiLengthPacketWriter(writer buf.Writer) *MultiLengthPacketWriter { func NewMultiLengthPacketWriter(writer buf.Writer) *MultiLengthPacketWriter {

View File

@ -515,18 +515,17 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
serverReader := link.Reader // .(*pipe.Reader) serverReader := link.Reader // .(*pipe.Reader)
serverWriter := link.Writer // .(*pipe.Writer) serverWriter := link.Writer // .(*pipe.Writer)
trafficState := proxy.NewTrafficState(account.ID.Bytes()) trafficState := proxy.NewTrafficState(account.ID.Bytes(), account.Flow)
postRequest := func() error { postRequest := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
// default: clientReader := reader // default: clientReader := reader
clientReader := encoding.DecodeBodyAddons(reader, request, requestAddons) clientReader := encoding.DecodeBodyAddons(reader, request, requestAddons, trafficState, ctx)
var err error var err error
if requestAddons.Flow == vless.XRV { if requestAddons.Flow == vless.XRV {
ctx1 := session.ContextWithInbound(ctx, nil) // TODO enable splice ctx1 := session.ContextWithInbound(ctx, nil) // TODO enable splice
clientReader = proxy.NewVisionReader(clientReader, trafficState, ctx1)
err = encoding.XtlsRead(clientReader, serverWriter, timer, connection, input, rawInput, trafficState, ctx1) err = encoding.XtlsRead(clientReader, serverWriter, timer, connection, input, rawInput, trafficState, ctx1)
} else { } else {
// from clientReader.ReadMultiBuffer to serverWriter.WriteMultiBufer // from clientReader.ReadMultiBuffer to serverWriter.WriteMultiBufer

View File

@ -184,7 +184,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
clientReader := link.Reader // .(*pipe.Reader) clientReader := link.Reader // .(*pipe.Reader)
clientWriter := link.Writer // .(*pipe.Writer) clientWriter := link.Writer // .(*pipe.Writer)
trafficState := proxy.NewTrafficState(account.ID.Bytes()) trafficState := proxy.NewTrafficState(account.ID.Bytes(), account.Flow)
if request.Command == protocol.RequestCommandUDP && (requestAddons.Flow == vless.XRV || (h.cone && request.Port != 53 && request.Port != 443)) { if request.Command == protocol.RequestCommandUDP && (requestAddons.Flow == vless.XRV || (h.cone && request.Port != 53 && request.Port != 443)) {
request.Command = protocol.RequestCommandMux request.Command = protocol.RequestCommandMux
request.Address = net.DomainAddress("v1.mux.cool") request.Address = net.DomainAddress("v1.mux.cool")
@ -265,16 +265,9 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
} }
// default: serverReader := buf.NewReader(conn) // default: serverReader := buf.NewReader(conn)
serverReader := encoding.DecodeBodyAddons(conn, request, responseAddons) serverReader := encoding.DecodeBodyAddons(conn, request, responseAddons, trafficState, ctx)
if requestAddons.Flow == vless.XRV {
serverReader = proxy.NewVisionReader(serverReader, trafficState, ctx)
}
if request.Command == protocol.RequestCommandMux && request.Port == 666 { if request.Command == protocol.RequestCommandMux && request.Port == 666 {
if requestAddons.Flow == vless.XRV {
serverReader = xudp.NewPacketReader(&buf.BufferedReader{Reader: serverReader}) serverReader = xudp.NewPacketReader(&buf.BufferedReader{Reader: serverReader})
} else {
serverReader = xudp.NewPacketReader(conn)
}
} }
if requestAddons.Flow == vless.XRV { if requestAddons.Flow == vless.XRV {