From bc4bf3d38f7b95c9b4bf58edbdc1219bac90094f Mon Sep 17 00:00:00 2001 From: RPRX <63339210+RPRX@users.noreply.github.com> Date: Sat, 9 Nov 2024 11:05:41 +0000 Subject: [PATCH] XHTTP: Add "stream-up" mode for client & server (#3994) --- infra/conf/transport_internet.go | 10 +++ .../internet/splithttp/browser_client.go | 4 + transport/internet/splithttp/client.go | 12 +++ transport/internet/splithttp/config.pb.go | 73 +++++++++------- transport/internet/splithttp/config.proto | 1 + transport/internet/splithttp/dialer.go | 84 ++++++++++--------- transport/internet/splithttp/hub.go | 33 ++++++-- transport/internet/splithttp/upload_queue.go | 24 ++++++ 8 files changed, 164 insertions(+), 77 deletions(-) diff --git a/infra/conf/transport_internet.go b/infra/conf/transport_internet.go index ed1ea072..f5a36328 100644 --- a/infra/conf/transport_internet.go +++ b/infra/conf/transport_internet.go @@ -233,6 +233,7 @@ type SplitHTTPConfig struct { XPaddingBytes *Int32Range `json:"xPaddingBytes"` Xmux Xmux `json:"xmux"` DownloadSettings *StreamConfig `json:"downloadSettings"` + Mode string `json:"mode"` } type Xmux struct { @@ -289,6 +290,14 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) { muxProtobuf.CMaxReuseTimes.To = 128 } + switch c.Mode { + case "": + c.Mode = "auto" + case "auto", "packet-up", "stream-up": + default: + return nil, errors.New("unsupported mode: " + c.Mode) + } + config := &splithttp.Config{ Path: c.Path, Host: c.Host, @@ -299,6 +308,7 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) { NoSSEHeader: c.NoSSEHeader, XPaddingBytes: splithttpNewRandRangeConfig(c.XPaddingBytes), Xmux: &muxProtobuf, + Mode: c.Mode, } var err error if c.DownloadSettings != nil { diff --git a/transport/internet/splithttp/browser_client.go b/transport/internet/splithttp/browser_client.go index 8a23682e..14ecaaf1 100644 --- a/transport/internet/splithttp/browser_client.go +++ b/transport/internet/splithttp/browser_client.go @@ -14,6 +14,10 @@ import ( // has no fields because everything is global state :O) type BrowserDialerClient struct{} +func (c *BrowserDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser { + panic("not implemented yet") +} + func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) { conn, err := browser_dialer.DialGet(baseURL) dummyAddr := &gonet.IPAddr{} diff --git a/transport/internet/splithttp/client.go b/transport/internet/splithttp/client.go index 3f278aa7..e5866b55 100644 --- a/transport/internet/splithttp/client.go +++ b/transport/internet/splithttp/client.go @@ -25,6 +25,10 @@ type DialerClient interface { // (ctx, baseURL) -> (downloadReader, remoteAddr, localAddr) // baseURL already contains sessionId OpenDownload(context.Context, string) (io.ReadCloser, net.Addr, net.Addr, error) + + // (ctx, baseURL) -> uploadWriter + // baseURL already contains sessionId + OpenUpload(context.Context, string) io.WriteCloser } // implements splithttp.DialerClient in terms of direct network connections @@ -38,6 +42,14 @@ type DefaultDialerClient struct { dialUploadConn func(ctxInner context.Context) (net.Conn, error) } +func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser { + reader, writer := io.Pipe() + req, _ := http.NewRequestWithContext(ctx, "POST", baseURL, reader) + req.Header = c.transportConfig.GetRequestHeader() + go c.client.Do(req) + return writer +} + func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) { var remoteAddr gonet.Addr var localAddr gonet.Addr diff --git a/transport/internet/splithttp/config.pb.go b/transport/internet/splithttp/config.pb.go index f3f80831..f2d1ab4f 100644 --- a/transport/internet/splithttp/config.pb.go +++ b/transport/internet/splithttp/config.pb.go @@ -36,6 +36,7 @@ type Config struct { XPaddingBytes *RandRangeConfig `protobuf:"bytes,8,opt,name=xPaddingBytes,proto3" json:"xPaddingBytes,omitempty"` Xmux *Multiplexing `protobuf:"bytes,9,opt,name=xmux,proto3" json:"xmux,omitempty"` DownloadSettings *internet.StreamConfig `protobuf:"bytes,10,opt,name=downloadSettings,proto3" json:"downloadSettings,omitempty"` + Mode string `protobuf:"bytes,11,opt,name=mode,proto3" json:"mode,omitempty"` } func (x *Config) Reset() { @@ -138,6 +139,13 @@ func (x *Config) GetDownloadSettings() *internet.StreamConfig { return nil } +func (x *Config) GetMode() string { + if x != nil { + return x.Mode + } + return "" +} + type RandRangeConfig struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -270,7 +278,7 @@ var file_transport_internet_splithttp_config_proto_rawDesc = []byte{ 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x1a, 0x1f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, - 0x82, 0x06, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, + 0x96, 0x06, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x4d, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x03, 0x20, 0x03, @@ -314,47 +322,48 @@ var file_transport_internet_splithttp_config_proto_rawDesc = []byte{ 0x32, 0x25, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x10, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, - 0x64, 0x53, 0x65, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x73, 0x1a, 0x39, 0x0a, 0x0b, 0x48, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x3a, 0x02, 0x38, 0x01, 0x22, 0x35, 0x0a, 0x0f, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, - 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x12, 0x0e, 0x0a, 0x02, 0x74, - 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x74, 0x6f, 0x22, 0xfe, 0x02, 0x0a, 0x0c, - 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x70, 0x6c, 0x65, 0x78, 0x69, 0x6e, 0x67, 0x12, 0x5a, 0x0a, 0x0e, - 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x18, 0x01, + 0x64, 0x53, 0x65, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6d, 0x6f, 0x64, + 0x65, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x1a, 0x39, 0x0a, + 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, + 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, + 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x35, 0x0a, 0x0f, 0x52, 0x61, 0x6e, 0x64, + 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x66, + 0x72, 0x6f, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x12, + 0x0e, 0x0a, 0x02, 0x74, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x74, 0x6f, 0x22, + 0xfe, 0x02, 0x0a, 0x0c, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x70, 0x6c, 0x65, 0x78, 0x69, 0x6e, 0x67, + 0x12, 0x5a, 0x0a, 0x0e, 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, + 0x63, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, + 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, + 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x61, 0x6e, + 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0e, 0x6d, 0x61, + 0x78, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x12, 0x5a, 0x0a, 0x0e, + 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0e, 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, - 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x12, 0x5a, 0x0a, 0x0e, 0x6d, 0x61, 0x78, 0x43, - 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x5a, 0x0a, 0x0e, 0x63, 0x4d, 0x61, 0x78, + 0x52, 0x65, 0x75, 0x73, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0e, 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x5a, 0x0a, 0x0e, 0x63, 0x4d, 0x61, 0x78, 0x52, 0x65, 0x75, 0x73, - 0x65, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x78, + 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0e, 0x63, 0x4d, 0x61, 0x78, 0x52, 0x65, 0x75, 0x73, 0x65, 0x54, + 0x69, 0x6d, 0x65, 0x73, 0x12, 0x5a, 0x0a, 0x0e, 0x63, 0x4d, 0x61, 0x78, 0x4c, 0x69, 0x66, 0x65, + 0x74, 0x69, 0x6d, 0x65, 0x4d, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x52, 0x0e, 0x63, 0x4d, 0x61, 0x78, 0x52, 0x65, 0x75, 0x73, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x73, - 0x12, 0x5a, 0x0a, 0x0e, 0x63, 0x4d, 0x61, 0x78, 0x4c, 0x69, 0x66, 0x65, 0x74, 0x69, 0x6d, 0x65, - 0x4d, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, - 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, - 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x61, 0x6e, - 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0e, 0x63, 0x4d, - 0x61, 0x78, 0x4c, 0x69, 0x66, 0x65, 0x74, 0x69, 0x6d, 0x65, 0x4d, 0x73, 0x42, 0x85, 0x01, 0x0a, - 0x25, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, - 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, - 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x50, 0x01, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, - 0x6f, 0x72, 0x65, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, - 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, - 0xaa, 0x02, 0x21, 0x58, 0x72, 0x61, 0x79, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, - 0x74, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x53, 0x70, 0x6c, 0x69, 0x74, - 0x48, 0x74, 0x74, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x0e, 0x63, 0x4d, 0x61, 0x78, 0x4c, 0x69, 0x66, 0x65, 0x74, 0x69, 0x6d, 0x65, 0x4d, 0x73, + 0x42, 0x85, 0x01, 0x0a, 0x25, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, + 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, + 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x50, 0x01, 0x5a, 0x36, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, + 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, + 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x73, 0x70, 0x6c, 0x69, 0x74, + 0x68, 0x74, 0x74, 0x70, 0xaa, 0x02, 0x21, 0x58, 0x72, 0x61, 0x79, 0x2e, 0x54, 0x72, 0x61, 0x6e, + 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x53, + 0x70, 0x6c, 0x69, 0x74, 0x48, 0x74, 0x74, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/transport/internet/splithttp/config.proto b/transport/internet/splithttp/config.proto index 384ae605..b5894620 100644 --- a/transport/internet/splithttp/config.proto +++ b/transport/internet/splithttp/config.proto @@ -19,6 +19,7 @@ message Config { RandRangeConfig xPaddingBytes = 8; Multiplexing xmux = 9; xray.transport.internet.StreamConfig downloadSettings = 10; + string mode = 11; } message RandRangeConfig { diff --git a/transport/internet/splithttp/dialer.go b/transport/internet/splithttp/dialer.go index 1c5300f3..a63d3253 100644 --- a/transport/internet/splithttp/dialer.go +++ b/transport/internet/splithttp/dialer.go @@ -254,9 +254,9 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me httpClient, muxRes := getHTTPClient(ctx, dest, streamSettings) - var httpClient2 DialerClient + httpClient2 := httpClient + requestURL2 := requestURL var muxRes2 *muxResource - var requestURL2 url.URL if transportConfiguration.DownloadSettings != nil { globalDialerAccess.Lock() if streamSettings.DownloadSettings == nil { @@ -279,11 +279,10 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me requestURL2.RawQuery = config2.GetNormalizedQuery() } - maxUploadSize := scMaxEachPostBytes.roll() - // WithSizeLimit(0) will still allow single bytes to pass, and a lot of - // code relies on this behavior. Subtract 1 so that together with - // uploadWriter wrapper, exact size limits can be enforced - uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1)) + reader, remoteAddr, localAddr, err := httpClient2.OpenDownload(context.WithoutCancel(ctx), requestURL2.String()) + if err != nil { + return nil, err + } if muxRes != nil { muxRes.OpenRequests.Add(1) @@ -291,15 +290,48 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me if muxRes2 != nil { muxRes2.OpenRequests.Add(1) } + closed := false + + conn := splitConn{ + writer: nil, + reader: reader, + remoteAddr: remoteAddr, + localAddr: localAddr, + onClose: func() { + if closed { + return + } + closed = true + if muxRes != nil { + muxRes.OpenRequests.Add(-1) + } + if muxRes2 != nil { + muxRes2.OpenRequests.Add(-1) + } + }, + } + + mode := transportConfiguration.Mode + if mode == "auto" && realityConfig != nil { + mode = "stream-up" + } + if mode == "stream-up" { + conn.writer = httpClient.OpenUpload(ctx, requestURL.String()) + return stat.Connection(&conn), nil + } + + maxUploadSize := scMaxEachPostBytes.roll() + // WithSizeLimit(0) will still allow single bytes to pass, and a lot of + // code relies on this behavior. Subtract 1 so that together with + // uploadWriter wrapper, exact size limits can be enforced + uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1)) + + conn.writer = uploadWriter{ + uploadPipeWriter, + maxUploadSize, + } go func() { - if muxRes != nil { - defer muxRes.OpenRequests.Add(-1) - } - if muxRes2 != nil { - defer muxRes2.OpenRequests.Add(-1) - } - requestsLimiter := semaphore.New(int(scMaxConcurrentPosts.roll())) var requestCounter int64 @@ -352,30 +384,6 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me } }() - httpClient3 := httpClient - requestURL3 := requestURL - if httpClient2 != nil { - httpClient3 = httpClient2 - requestURL3 = requestURL2 - } - - reader, remoteAddr, localAddr, err := httpClient3.OpenDownload(context.WithoutCancel(ctx), requestURL3.String()) - if err != nil { - return nil, err - } - - writer := uploadWriter{ - uploadPipeWriter, - maxUploadSize, - } - - conn := splitConn{ - writer: writer, - reader: reader, - remoteAddr: remoteAddr, - localAddr: localAddr, - } - return stat.Connection(&conn), nil } diff --git a/transport/internet/splithttp/hub.go b/transport/internet/splithttp/hub.go index b7244f10..58113f02 100644 --- a/transport/internet/splithttp/hub.go +++ b/transport/internet/splithttp/hub.go @@ -100,6 +100,8 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req return } + h.config.WriteResponseHeader(writer) + sessionId := "" subpath := strings.Split(request.URL.Path[len(h.path):], "/") if len(subpath) > 0 { @@ -134,7 +136,26 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req } if seq == "" { - errors.LogInfo(context.Background(), "no seq on request:", request.URL.Path) + if h.config.Mode == "packet-up" { + errors.LogInfo(context.Background(), "stream-up mode is not allowed") + writer.WriteHeader(http.StatusBadRequest) + return + } + err = currentSession.uploadQueue.Push(Packet{ + Reader: request.Body, + }) + if err != nil { + errors.LogInfoInner(context.Background(), err, "failed to upload (PushReader)") + writer.WriteHeader(http.StatusConflict) + } else { + writer.WriteHeader(http.StatusOK) + <-request.Context().Done() + } + return + } + + if h.config.Mode == "stream-up" { + errors.LogInfo(context.Background(), "packet-up mode is not allowed") writer.WriteHeader(http.StatusBadRequest) return } @@ -148,14 +169,14 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req } if err != nil { - errors.LogInfoInner(context.Background(), err, "failed to upload") + errors.LogInfoInner(context.Background(), err, "failed to upload (ReadAll)") writer.WriteHeader(http.StatusInternalServerError) return } seqInt, err := strconv.ParseUint(seq, 10, 64) if err != nil { - errors.LogInfoInner(context.Background(), err, "failed to upload") + errors.LogInfoInner(context.Background(), err, "failed to upload (ParseUint)") writer.WriteHeader(http.StatusInternalServerError) return } @@ -166,12 +187,11 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req }) if err != nil { - errors.LogInfoInner(context.Background(), err, "failed to upload") + errors.LogInfoInner(context.Background(), err, "failed to upload (PushPayload)") writer.WriteHeader(http.StatusInternalServerError) return } - h.config.WriteResponseHeader(writer) writer.WriteHeader(http.StatusOK) } else if request.Method == "GET" { responseFlusher, ok := writer.(http.Flusher) @@ -195,8 +215,6 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req writer.Header().Set("Content-Type", "text/event-stream") } - h.config.WriteResponseHeader(writer) - writer.WriteHeader(http.StatusOK) responseFlusher.Flush() @@ -223,6 +241,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req conn.Close() } else { + errors.LogInfo(context.Background(), "unsupported method: ", request.Method) writer.WriteHeader(http.StatusMethodNotAllowed) } } diff --git a/transport/internet/splithttp/upload_queue.go b/transport/internet/splithttp/upload_queue.go index 9ac38f8b..f8dd87b0 100644 --- a/transport/internet/splithttp/upload_queue.go +++ b/transport/internet/splithttp/upload_queue.go @@ -6,17 +6,20 @@ package splithttp import ( "container/heap" "io" + "runtime" "sync" "github.com/xtls/xray-core/common/errors" ) type Packet struct { + Reader io.ReadCloser Payload []byte Seq uint64 } type uploadQueue struct { + reader io.ReadCloser pushedPackets chan Packet writeCloseMutex sync.Mutex heap uploadHeap @@ -39,7 +42,16 @@ func (h *uploadQueue) Push(p Packet) error { h.writeCloseMutex.Lock() defer h.writeCloseMutex.Unlock() + runtime.Gosched() + if h.reader != nil && p.Reader != nil { + p.Reader.Close() + return errors.New("h.reader already exists") + } + if h.closed { + if p.Reader != nil { + p.Reader.Close() + } return errors.New("splithttp packet queue closed") } @@ -55,10 +67,18 @@ func (h *uploadQueue) Close() error { h.closed = true close(h.pushedPackets) } + runtime.Gosched() + if h.reader != nil { + return h.reader.Close() + } return nil } func (h *uploadQueue) Read(b []byte) (int, error) { + if h.reader != nil { + return h.reader.Read(b) + } + if h.closed { return 0, io.EOF } @@ -68,6 +88,10 @@ func (h *uploadQueue) Read(b []byte) (int, error) { if !more { return 0, io.EOF } + if packet.Reader != nil { + h.reader = packet.Reader + return h.reader.Read(b) + } heap.Push(&h.heap, packet) }