From 24a2be43ef997f500602127259d8d076d0c913c5 Mon Sep 17 00:00:00 2001 From: RPRX <63339210+RPRX@users.noreply.github.com> Date: Mon, 10 Apr 2023 10:36:07 +0800 Subject: [PATCH] Replace "only" with "xudpConcurrency" in Mux config --- app/proxyman/config.pb.go | 37 +++++----- app/proxyman/config.proto | 6 +- app/proxyman/outbound/handler.go | 116 ++++++++++++++++++++----------- common/mux/client.go | 1 - infra/conf/xray.go | 35 +++------- infra/conf/xray_test.go | 7 +- 6 files changed, 112 insertions(+), 90 deletions(-) diff --git a/app/proxyman/config.pb.go b/app/proxyman/config.pb.go index c3b221c4..664e6670 100644 --- a/app/proxyman/config.pb.go +++ b/app/proxyman/config.pb.go @@ -594,9 +594,9 @@ type MultiplexingConfig struct { // Whether or not Mux is enabled. Enabled bool `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"` // Max number of concurrent connections that one Mux connection can handle. - Concurrency uint32 `protobuf:"varint,2,opt,name=concurrency,proto3" json:"concurrency,omitempty"` - // Both(0), TCP(net.Network_TCP), UDP(net.Network_UDP). - Only uint32 `protobuf:"varint,3,opt,name=only,proto3" json:"only,omitempty"` + Concurrency int32 `protobuf:"varint,2,opt,name=concurrency,proto3" json:"concurrency,omitempty"` + // Transport XUDP in another Mux. + XudpConcurrency int32 `protobuf:"varint,3,opt,name=xudpConcurrency,proto3" json:"xudpConcurrency,omitempty"` } func (x *MultiplexingConfig) Reset() { @@ -638,16 +638,16 @@ func (x *MultiplexingConfig) GetEnabled() bool { return false } -func (x *MultiplexingConfig) GetConcurrency() uint32 { +func (x *MultiplexingConfig) GetConcurrency() int32 { if x != nil { return x.Concurrency } return 0 } -func (x *MultiplexingConfig) GetOnly() uint32 { +func (x *MultiplexingConfig) GetXudpConcurrency() int32 { if x != nil { - return x.Only + return x.XudpConcurrency } return 0 } @@ -865,22 +865,23 @@ var file_app_proxyman_config_proto_rawDesc = []byte{ 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x6d, 0x61, 0x6e, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x70, 0x6c, 0x65, 0x78, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x11, 0x6d, 0x75, 0x6c, 0x74, 0x69, - 0x70, 0x6c, 0x65, 0x78, 0x53, 0x65, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x73, 0x22, 0x64, 0x0a, 0x12, + 0x70, 0x6c, 0x65, 0x78, 0x53, 0x65, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x73, 0x22, 0x7a, 0x0a, 0x12, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x70, 0x6c, 0x65, 0x78, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0d, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x12, 0x12, - 0x0a, 0x04, 0x6f, 0x6e, 0x6c, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x6f, 0x6e, - 0x6c, 0x79, 0x2a, 0x23, 0x0a, 0x0e, 0x4b, 0x6e, 0x6f, 0x77, 0x6e, 0x50, 0x72, 0x6f, 0x74, 0x6f, - 0x63, 0x6f, 0x6c, 0x73, 0x12, 0x08, 0x0a, 0x04, 0x48, 0x54, 0x54, 0x50, 0x10, 0x00, 0x12, 0x07, - 0x0a, 0x03, 0x54, 0x4c, 0x53, 0x10, 0x01, 0x42, 0x55, 0x0a, 0x15, 0x63, 0x6f, 0x6d, 0x2e, 0x78, - 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x6d, 0x61, 0x6e, - 0x50, 0x01, 0x5a, 0x26, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, - 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70, - 0x70, 0x2f, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x6d, 0x61, 0x6e, 0xaa, 0x02, 0x11, 0x58, 0x72, 0x61, - 0x79, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x6d, 0x61, 0x6e, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x12, 0x28, + 0x0a, 0x0f, 0x78, 0x75, 0x64, 0x70, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, + 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0f, 0x78, 0x75, 0x64, 0x70, 0x43, 0x6f, 0x6e, + 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x2a, 0x23, 0x0a, 0x0e, 0x4b, 0x6e, 0x6f, 0x77, + 0x6e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x73, 0x12, 0x08, 0x0a, 0x04, 0x48, 0x54, + 0x54, 0x50, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x54, 0x4c, 0x53, 0x10, 0x01, 0x42, 0x55, 0x0a, + 0x15, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x70, 0x72, + 0x6f, 0x78, 0x79, 0x6d, 0x61, 0x6e, 0x50, 0x01, 0x5a, 0x26, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, + 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x6d, 0x61, 0x6e, + 0xaa, 0x02, 0x11, 0x58, 0x72, 0x61, 0x79, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x50, 0x72, 0x6f, 0x78, + 0x79, 0x6d, 0x61, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/app/proxyman/config.proto b/app/proxyman/config.proto index 8f282aa2..dc755fef 100644 --- a/app/proxyman/config.proto +++ b/app/proxyman/config.proto @@ -97,7 +97,7 @@ message MultiplexingConfig { // Whether or not Mux is enabled. bool enabled = 1; // Max number of concurrent connections that one Mux connection can handle. - uint32 concurrency = 2; - // Both(0), TCP(net.Network_TCP), UDP(net.Network_UDP). - uint32 only = 3; + int32 concurrency = 2; + // Transport XUDP in another Mux. + int32 xudpConcurrency = 3; } diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index 89e2862d..4de70df2 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -57,6 +57,7 @@ type Handler struct { proxy proxy.Outbound outboundManager outbound.Manager mux *mux.ClientManager + xudp *mux.ClientManager uplinkCounter stats.Counter downlinkCounter stats.Counter } @@ -106,23 +107,49 @@ func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbou } if h.senderSettings != nil && h.senderSettings.MultiplexSettings != nil { - config := h.senderSettings.MultiplexSettings - if config.Concurrency < 1 || config.Concurrency > 1024 { - return nil, newError("invalid mux concurrency: ", config.Concurrency).AtWarning() - } - h.mux = &mux.ClientManager{ - Enabled: config.Enabled, - Picker: &mux.IncrementalWorkerPicker{ - Factory: &mux.DialingWorkerFactory{ - Proxy: proxyHandler, - Dialer: h, - Strategy: mux.ClientStrategy{ - MaxConcurrency: config.Concurrency, - MaxConnection: 128, + if config := h.senderSettings.MultiplexSettings; config.Enabled { + if config.Concurrency < 0 { + h.mux = &mux.ClientManager{Enabled: false} + } + if config.Concurrency == 0 { + config.Concurrency = 8 // same as before + } + if config.Concurrency > 0 { + h.mux = &mux.ClientManager{ + Enabled: true, + Picker: &mux.IncrementalWorkerPicker{ + Factory: &mux.DialingWorkerFactory{ + Proxy: proxyHandler, + Dialer: h, + Strategy: mux.ClientStrategy{ + MaxConcurrency: uint32(config.Concurrency), + MaxConnection: 128, + }, + }, }, - }, - }, - Only: config.Only, + } + } + if config.XudpConcurrency < 0 { + h.xudp = &mux.ClientManager{Enabled: false} + } + if config.XudpConcurrency == 0 { + h.xudp = nil // same as before + } + if config.XudpConcurrency > 0 { + h.xudp = &mux.ClientManager{ + Enabled: true, + Picker: &mux.IncrementalWorkerPicker{ + Factory: &mux.DialingWorkerFactory{ + Proxy: proxyHandler, + Dialer: h, + Strategy: mux.ClientStrategy{ + MaxConcurrency: uint32(config.XudpConcurrency), + MaxConnection: 128, + }, + }, + }, + } + } } } @@ -137,33 +164,44 @@ func (h *Handler) Tag() string { // Dispatch implements proxy.Outbound.Dispatch. func (h *Handler) Dispatch(ctx context.Context, link *transport.Link) { - outbound := session.OutboundFromContext(ctx) - if h.mux != nil && (h.mux.Enabled || session.MuxPreferedFromContext(ctx)) && - (h.mux.Only == 0 || (outbound != nil && h.mux.Only == uint32(outbound.Target.Network))) { - if err := h.mux.Dispatch(ctx, link); err != nil { - err := newError("failed to process mux outbound traffic").Base(err) - session.SubmitOutboundErrorToOriginator(ctx, err) - err.WriteToLog(session.ExportIDToError(ctx)) - common.Interrupt(link.Writer) - } - } else { - err := h.proxy.Process(ctx, link, h) - if err != nil { - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) || errors.Is(err, context.Canceled) { - err = nil + if h.mux != nil { + test := func(err error) { + if err != nil { + err := newError("failed to process mux outbound traffic").Base(err) + session.SubmitOutboundErrorToOriginator(ctx, err) + err.WriteToLog(session.ExportIDToError(ctx)) + common.Interrupt(link.Writer) } } - if err != nil { - // Ensure outbound ray is properly closed. - err := newError("failed to process outbound traffic").Base(err) - session.SubmitOutboundErrorToOriginator(ctx, err) - err.WriteToLog(session.ExportIDToError(ctx)) - common.Interrupt(link.Writer) - } else { - common.Must(common.Close(link.Writer)) + if h.xudp != nil && session.OutboundFromContext(ctx).Target.Network == net.Network_UDP { + if !h.xudp.Enabled { + goto out + } + test(h.xudp.Dispatch(ctx, link)) + return + } + if h.mux.Enabled { + test(h.mux.Dispatch(ctx, link)) + return } - common.Interrupt(link.Reader) } +out: + err := h.proxy.Process(ctx, link, h) + if err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) || errors.Is(err, context.Canceled) { + err = nil + } + } + if err != nil { + // Ensure outbound ray is properly closed. + err := newError("failed to process outbound traffic").Base(err) + session.SubmitOutboundErrorToOriginator(ctx, err) + err.WriteToLog(session.ExportIDToError(ctx)) + common.Interrupt(link.Writer) + } else { + common.Must(common.Close(link.Writer)) + } + common.Interrupt(link.Reader) } // Address implements internet.Dialer. diff --git a/common/mux/client.go b/common/mux/client.go index f933ef4c..88621be0 100644 --- a/common/mux/client.go +++ b/common/mux/client.go @@ -24,7 +24,6 @@ import ( type ClientManager struct { Enabled bool // wheather mux is enabled from user config Picker WorkerPicker - Only uint32 } func (m *ClientManager) Dispatch(ctx context.Context, link *transport.Link) error { diff --git a/infra/conf/xray.go b/infra/conf/xray.go index 63ffccf5..07c667fe 100644 --- a/infra/conf/xray.go +++ b/infra/conf/xray.go @@ -10,7 +10,6 @@ import ( "github.com/xtls/xray-core/app/dispatcher" "github.com/xtls/xray-core/app/proxyman" "github.com/xtls/xray-core/app/stats" - "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/serial" core "github.com/xtls/xray-core/core" "github.com/xtls/xray-core/transport/internet" @@ -108,36 +107,18 @@ func (c *SniffingConfig) Build() (*proxyman.SniffingConfig, error) { } type MuxConfig struct { - Enabled bool `json:"enabled"` - Concurrency int16 `json:"concurrency"` - Only string `json:"only"` + Enabled bool `json:"enabled"` + Concurrency int16 `json:"concurrency"` + XudpConcurrency int16 `json:"xudpConcurrency"` } // Build creates MultiplexingConfig, Concurrency < 0 completely disables mux. func (m *MuxConfig) Build() (*proxyman.MultiplexingConfig, error) { - if m.Concurrency < 0 { - return nil, nil - } - if m.Concurrency == 0 { - m.Concurrency = 8 - } - - config := &proxyman.MultiplexingConfig{ - Enabled: m.Enabled, - Concurrency: uint32(m.Concurrency), - } - - switch strings.ToLower(m.Only) { - case "", "both": - case "tcp": - config.Only = uint32(net.Network_TCP) - case "udp": - config.Only = uint32(net.Network_UDP) - default: - return nil, newError(`unknown "only": `, m.Only) - } - - return config, nil + return &proxyman.MultiplexingConfig{ + Enabled: m.Enabled, + Concurrency: int32(m.Concurrency), + XudpConcurrency: int32(m.XudpConcurrency), + }, nil } type InboundDetourAllocationConfig struct { diff --git a/infra/conf/xray_test.go b/infra/conf/xray_test.go index 02d868fe..91b82eea 100644 --- a/infra/conf/xray_test.go +++ b/infra/conf/xray_test.go @@ -345,13 +345,16 @@ func TestMuxConfig_Build(t *testing.T) { }}, {"empty def", `{}`, &proxyman.MultiplexingConfig{ Enabled: false, - Concurrency: 8, + Concurrency: 0, }}, {"not enable", `{"enabled": false, "concurrency": 4}`, &proxyman.MultiplexingConfig{ Enabled: false, Concurrency: 4, }}, - {"forbidden", `{"enabled": false, "concurrency": -1}`, nil}, + {"forbidden", `{"enabled": false, "concurrency": -1}`, &proxyman.MultiplexingConfig{ + Enabled: false, + Concurrency: -1, + }}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {