From c10bd28731a334f85269df64bb544560a24bf9d2 Mon Sep 17 00:00:00 2001 From: mmmray <142015632+mmmray@users.noreply.github.com> Date: Tue, 18 Jun 2024 07:36:36 +0200 Subject: [PATCH] Add SplitHTTP Transport (#3412) --- infra/conf/transport.go | 12 + infra/conf/transport_internet.go | 42 +++ infra/conf/xray.go | 3 + main/distro/all/all.go | 1 + transport/internet/config.pb.go | 42 +-- transport/internet/config.proto | 1 + transport/internet/splithttp/config.go | 52 +++ transport/internet/splithttp/config.pb.go | 202 +++++++++++ transport/internet/splithttp/config.proto | 15 + transport/internet/splithttp/connection.go | 59 ++++ transport/internet/splithttp/dialer.go | 316 ++++++++++++++++++ .../internet/splithttp/errors.generated.go | 9 + transport/internet/splithttp/hub.go | 262 +++++++++++++++ transport/internet/splithttp/splithttp.go | 17 + .../internet/splithttp/splithttp_test.go | 154 +++++++++ transport/internet/splithttp/upload_queue.go | 120 +++++++ 16 files changed, 1288 insertions(+), 19 deletions(-) create mode 100644 transport/internet/splithttp/config.go create mode 100644 transport/internet/splithttp/config.pb.go create mode 100644 transport/internet/splithttp/config.proto create mode 100644 transport/internet/splithttp/connection.go create mode 100644 transport/internet/splithttp/dialer.go create mode 100644 transport/internet/splithttp/errors.generated.go create mode 100644 transport/internet/splithttp/hub.go create mode 100644 transport/internet/splithttp/splithttp.go create mode 100644 transport/internet/splithttp/splithttp_test.go create mode 100644 transport/internet/splithttp/upload_queue.go diff --git a/infra/conf/transport.go b/infra/conf/transport.go index b7969e47..1e0e70de 100644 --- a/infra/conf/transport.go +++ b/infra/conf/transport.go @@ -16,6 +16,7 @@ type TransportConfig struct { GRPCConfig *GRPCConfig `json:"grpcSettings"` GUNConfig *GRPCConfig `json:"gunSettings"` HTTPUPGRADEConfig *HttpUpgradeConfig `json:"httpupgradeSettings"` + SplitHTTPConfig *SplitHTTPConfig `json:"splithttpSettings"` } // Build implements Buildable. @@ -113,5 +114,16 @@ func (c *TransportConfig) Build() (*global.Config, error) { }) } + if c.SplitHTTPConfig != nil { + shs, err := c.SplitHTTPConfig.Build() + if err != nil { + return nil, newError("failed to build SplitHTTP config").Base(err) + } + config.TransportSettings = append(config.TransportSettings, &internet.TransportConfig{ + ProtocolName: "splithttp", + Settings: serial.ToTypedMessage(shs), + }) + } + return config, nil } diff --git a/infra/conf/transport_internet.go b/infra/conf/transport_internet.go index 3ebb5326..12ed4197 100644 --- a/infra/conf/transport_internet.go +++ b/infra/conf/transport_internet.go @@ -23,6 +23,7 @@ import ( "github.com/xtls/xray-core/transport/internet/kcp" "github.com/xtls/xray-core/transport/internet/quic" "github.com/xtls/xray-core/transport/internet/reality" + "github.com/xtls/xray-core/transport/internet/splithttp" "github.com/xtls/xray-core/transport/internet/tcp" "github.com/xtls/xray-core/transport/internet/tls" "github.com/xtls/xray-core/transport/internet/websocket" @@ -223,6 +224,34 @@ func (c *HttpUpgradeConfig) Build() (proto.Message, error) { return config, nil } +type SplitHTTPConfig struct { + Host string `json:"host"` + Path string `json:"path"` + Headers map[string]string `json:"headers"` + MaxConcurrentUploads int32 `json:"maxConcurrentUploads"` + MaxUploadSize int32 `json:"maxUploadSize"` +} + +// Build implements Buildable. +func (c *SplitHTTPConfig) Build() (proto.Message, error) { + // If http host is not set in the Host field, but in headers field, we add it to Host Field here. + // If we don't do that, http host will be overwritten as address. + // Host priority: Host field > headers field > address. + if c.Host == "" && c.Headers["host"] != "" { + c.Host = c.Headers["host"] + } else if c.Host == "" && c.Headers["Host"] != "" { + c.Host = c.Headers["Host"] + } + config := &splithttp.Config{ + Path: c.Path, + Host: c.Host, + Header: c.Headers, + MaxConcurrentUploads: c.MaxConcurrentUploads, + MaxUploadSize: c.MaxUploadSize, + } + return config, nil +} + type HTTPConfig struct { Host *StringList `json:"host"` Path string `json:"path"` @@ -648,6 +677,8 @@ func (p TransportProtocol) Build() (string, error) { return "grpc", nil case "httpupgrade": return "httpupgrade", nil + case "splithttp": + return "splithttp", nil default: return "", newError("Config: unknown transport protocol: ", p) } @@ -762,6 +793,7 @@ type StreamConfig struct { GRPCConfig *GRPCConfig `json:"grpcSettings"` GUNConfig *GRPCConfig `json:"gunSettings"` HTTPUPGRADESettings *HttpUpgradeConfig `json:"httpupgradeSettings"` + SplitHTTPSettings *SplitHTTPConfig `json:"splithttpSettings"` } // Build implements Buildable. @@ -892,6 +924,16 @@ func (c *StreamConfig) Build() (*internet.StreamConfig, error) { Settings: serial.ToTypedMessage(hs), }) } + if c.SplitHTTPSettings != nil { + hs, err := c.SplitHTTPSettings.Build() + if err != nil { + return nil, newError("Failed to build SplitHTTP config.").Base(err) + } + config.TransportSettings = append(config.TransportSettings, &internet.TransportConfig{ + ProtocolName: "splithttp", + Settings: serial.ToTypedMessage(hs), + }) + } if c.SocketSettings != nil { ss, err := c.SocketSettings.Build() if err != nil { diff --git a/infra/conf/xray.go b/infra/conf/xray.go index e48e04b2..5e1adcbc 100644 --- a/infra/conf/xray.go +++ b/infra/conf/xray.go @@ -557,6 +557,9 @@ func applyTransportConfig(s *StreamConfig, t *TransportConfig) { if s.HTTPUPGRADESettings == nil { s.HTTPUPGRADESettings = t.HTTPUPGRADEConfig } + if s.SplitHTTPSettings == nil { + s.SplitHTTPSettings = t.SplitHTTPConfig + } } // Build implements Buildable. diff --git a/main/distro/all/all.go b/main/distro/all/all.go index 0f0397f5..ae717653 100644 --- a/main/distro/all/all.go +++ b/main/distro/all/all.go @@ -57,6 +57,7 @@ import ( _ "github.com/xtls/xray-core/transport/internet/kcp" _ "github.com/xtls/xray-core/transport/internet/quic" _ "github.com/xtls/xray-core/transport/internet/reality" + _ "github.com/xtls/xray-core/transport/internet/splithttp" _ "github.com/xtls/xray-core/transport/internet/tcp" _ "github.com/xtls/xray-core/transport/internet/tls" _ "github.com/xtls/xray-core/transport/internet/udp" diff --git a/transport/internet/config.pb.go b/transport/internet/config.pb.go index e262a1a8..55010f00 100644 --- a/transport/internet/config.pb.go +++ b/transport/internet/config.pb.go @@ -31,6 +31,7 @@ const ( TransportProtocol_HTTP TransportProtocol = 4 TransportProtocol_DomainSocket TransportProtocol = 5 TransportProtocol_HTTPUpgrade TransportProtocol = 6 + TransportProtocol_SplitHTTP TransportProtocol = 7 ) // Enum value maps for TransportProtocol. @@ -43,6 +44,7 @@ var ( 4: "HTTP", 5: "DomainSocket", 6: "HTTPUpgrade", + 7: "SplitHTTP", } TransportProtocol_value = map[string]int32{ "TCP": 0, @@ -52,6 +54,7 @@ var ( "HTTP": 4, "DomainSocket": 5, "HTTPUpgrade": 6, + "SplitHTTP": 7, } ) @@ -728,31 +731,32 @@ var file_transport_internet_config_proto_rawDesc = []byte{ 0x4d, 0x70, 0x74, 0x63, 0x70, 0x22, 0x2f, 0x0a, 0x0a, 0x54, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x4f, 0x66, 0x66, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x54, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x65, 0x64, 0x69, - 0x72, 0x65, 0x63, 0x74, 0x10, 0x02, 0x2a, 0x6b, 0x0a, 0x11, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, + 0x72, 0x65, 0x63, 0x74, 0x10, 0x02, 0x2a, 0x7a, 0x0a, 0x11, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x07, 0x0a, 0x03, 0x54, 0x43, 0x50, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x55, 0x44, 0x50, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4b, 0x43, 0x50, 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x57, 0x65, 0x62, 0x53, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x10, 0x03, 0x12, 0x08, 0x0a, 0x04, 0x48, 0x54, 0x54, 0x50, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x53, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x10, 0x05, 0x12, 0x0f, 0x0a, 0x0b, 0x48, 0x54, 0x54, 0x50, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, - 0x65, 0x10, 0x06, 0x2a, 0xa9, 0x01, 0x0a, 0x0e, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x53, 0x74, - 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x12, 0x09, 0x0a, 0x05, 0x41, 0x53, 0x5f, 0x49, 0x53, 0x10, - 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x55, 0x53, 0x45, 0x5f, 0x49, 0x50, 0x10, 0x01, 0x12, 0x0b, 0x0a, - 0x07, 0x55, 0x53, 0x45, 0x5f, 0x49, 0x50, 0x34, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x53, - 0x45, 0x5f, 0x49, 0x50, 0x36, 0x10, 0x03, 0x12, 0x0c, 0x0a, 0x08, 0x55, 0x53, 0x45, 0x5f, 0x49, - 0x50, 0x34, 0x36, 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x55, 0x53, 0x45, 0x5f, 0x49, 0x50, 0x36, - 0x34, 0x10, 0x05, 0x12, 0x0c, 0x0a, 0x08, 0x46, 0x4f, 0x52, 0x43, 0x45, 0x5f, 0x49, 0x50, 0x10, - 0x06, 0x12, 0x0d, 0x0a, 0x09, 0x46, 0x4f, 0x52, 0x43, 0x45, 0x5f, 0x49, 0x50, 0x34, 0x10, 0x07, - 0x12, 0x0d, 0x0a, 0x09, 0x46, 0x4f, 0x52, 0x43, 0x45, 0x5f, 0x49, 0x50, 0x36, 0x10, 0x08, 0x12, - 0x0e, 0x0a, 0x0a, 0x46, 0x4f, 0x52, 0x43, 0x45, 0x5f, 0x49, 0x50, 0x34, 0x36, 0x10, 0x09, 0x12, - 0x0e, 0x0a, 0x0a, 0x46, 0x4f, 0x52, 0x43, 0x45, 0x5f, 0x49, 0x50, 0x36, 0x34, 0x10, 0x0a, 0x42, - 0x67, 0x0a, 0x1b, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, - 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x50, 0x01, - 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, - 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x74, 0x72, 0x61, 0x6e, - 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0xaa, 0x02, - 0x17, 0x58, 0x72, 0x61, 0x79, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, - 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x10, 0x06, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x70, 0x6c, 0x69, 0x74, 0x48, 0x54, 0x54, 0x50, + 0x10, 0x07, 0x2a, 0xa9, 0x01, 0x0a, 0x0e, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x53, 0x74, 0x72, + 0x61, 0x74, 0x65, 0x67, 0x79, 0x12, 0x09, 0x0a, 0x05, 0x41, 0x53, 0x5f, 0x49, 0x53, 0x10, 0x00, + 0x12, 0x0a, 0x0a, 0x06, 0x55, 0x53, 0x45, 0x5f, 0x49, 0x50, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, + 0x55, 0x53, 0x45, 0x5f, 0x49, 0x50, 0x34, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x53, 0x45, + 0x5f, 0x49, 0x50, 0x36, 0x10, 0x03, 0x12, 0x0c, 0x0a, 0x08, 0x55, 0x53, 0x45, 0x5f, 0x49, 0x50, + 0x34, 0x36, 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x55, 0x53, 0x45, 0x5f, 0x49, 0x50, 0x36, 0x34, + 0x10, 0x05, 0x12, 0x0c, 0x0a, 0x08, 0x46, 0x4f, 0x52, 0x43, 0x45, 0x5f, 0x49, 0x50, 0x10, 0x06, + 0x12, 0x0d, 0x0a, 0x09, 0x46, 0x4f, 0x52, 0x43, 0x45, 0x5f, 0x49, 0x50, 0x34, 0x10, 0x07, 0x12, + 0x0d, 0x0a, 0x09, 0x46, 0x4f, 0x52, 0x43, 0x45, 0x5f, 0x49, 0x50, 0x36, 0x10, 0x08, 0x12, 0x0e, + 0x0a, 0x0a, 0x46, 0x4f, 0x52, 0x43, 0x45, 0x5f, 0x49, 0x50, 0x34, 0x36, 0x10, 0x09, 0x12, 0x0e, + 0x0a, 0x0a, 0x46, 0x4f, 0x52, 0x43, 0x45, 0x5f, 0x49, 0x50, 0x36, 0x34, 0x10, 0x0a, 0x42, 0x67, + 0x0a, 0x1b, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, + 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x50, 0x01, 0x5a, + 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, + 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, + 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0xaa, 0x02, 0x17, + 0x58, 0x72, 0x61, 0x79, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, + 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/transport/internet/config.proto b/transport/internet/config.proto index 22277b42..207cdff3 100644 --- a/transport/internet/config.proto +++ b/transport/internet/config.proto @@ -16,6 +16,7 @@ enum TransportProtocol { HTTP = 4; DomainSocket = 5; HTTPUpgrade = 6; + SplitHTTP = 7; } enum DomainStrategy { diff --git a/transport/internet/splithttp/config.go b/transport/internet/splithttp/config.go new file mode 100644 index 00000000..4c44e361 --- /dev/null +++ b/transport/internet/splithttp/config.go @@ -0,0 +1,52 @@ +package splithttp + +import ( + "net/http" + + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/transport/internet" +) + +func (c *Config) GetNormalizedPath() string { + path := c.Path + if path == "" { + path = "/" + } + if path[0] != '/' { + path = "/" + path + } + if path[len(path)-1] != '/' { + path = path + "/" + } + return path +} + +func (c *Config) GetRequestHeader() http.Header { + header := http.Header{} + for k, v := range c.Header { + header.Add(k, v) + } + return header +} + +func (c *Config) GetNormalizedMaxConcurrentUploads() int32 { + if c.MaxConcurrentUploads == 0 { + return 10 + } + + return c.MaxConcurrentUploads +} + +func (c *Config) GetNormalizedMaxUploadSize() int32 { + if c.MaxUploadSize == 0 { + return 1000000 + } + + return c.MaxUploadSize +} + +func init() { + common.Must(internet.RegisterProtocolConfigCreator(protocolName, func() interface{} { + return new(Config) + })) +} diff --git a/transport/internet/splithttp/config.pb.go b/transport/internet/splithttp/config.pb.go new file mode 100644 index 00000000..3975b9f6 --- /dev/null +++ b/transport/internet/splithttp/config.pb.go @@ -0,0 +1,202 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.1 +// protoc v5.27.0 +// source: transport/internet/splithttp/config.proto + +package splithttp + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Config struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` + Path string `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"` + Header map[string]string `protobuf:"bytes,3,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + MaxConcurrentUploads int32 `protobuf:"varint,4,opt,name=maxConcurrentUploads,proto3" json:"maxConcurrentUploads,omitempty"` + MaxUploadSize int32 `protobuf:"varint,5,opt,name=maxUploadSize,proto3" json:"maxUploadSize,omitempty"` +} + +func (x *Config) Reset() { + *x = Config{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_splithttp_config_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Config) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Config) ProtoMessage() {} + +func (x *Config) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_splithttp_config_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Config.ProtoReflect.Descriptor instead. +func (*Config) Descriptor() ([]byte, []int) { + return file_transport_internet_splithttp_config_proto_rawDescGZIP(), []int{0} +} + +func (x *Config) GetHost() string { + if x != nil { + return x.Host + } + return "" +} + +func (x *Config) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +func (x *Config) GetHeader() map[string]string { + if x != nil { + return x.Header + } + return nil +} + +func (x *Config) GetMaxConcurrentUploads() int32 { + if x != nil { + return x.MaxConcurrentUploads + } + return 0 +} + +func (x *Config) GetMaxUploadSize() int32 { + if x != nil { + return x.MaxUploadSize + } + return 0 +} + +var File_transport_internet_splithttp_config_proto protoreflect.FileDescriptor + +var file_transport_internet_splithttp_config_proto_rawDesc = []byte{ + 0x0a, 0x29, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2f, 0x63, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x21, 0x78, 0x72, 0x61, + 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x22, 0x94, + 0x02, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, + 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x12, 0x0a, + 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, + 0x68, 0x12, 0x4d, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x03, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x35, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, + 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, + 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x48, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x12, 0x32, 0x0a, 0x14, 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, + 0x74, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x14, + 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x55, 0x70, 0x6c, + 0x6f, 0x61, 0x64, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x6d, 0x61, 0x78, 0x55, 0x70, 0x6c, 0x6f, 0x61, + 0x64, 0x53, 0x69, 0x7a, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0d, 0x6d, 0x61, 0x78, + 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x53, 0x69, 0x7a, 0x65, 0x1a, 0x39, 0x0a, 0x0b, 0x48, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x85, 0x01, 0x0a, 0x25, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, + 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, + 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x50, + 0x01, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, + 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, + 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0xaa, 0x02, 0x21, 0x58, 0x72, 0x61, 0x79, + 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, + 0x6e, 0x65, 0x74, 0x2e, 0x53, 0x70, 0x6c, 0x69, 0x74, 0x48, 0x74, 0x74, 0x70, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_transport_internet_splithttp_config_proto_rawDescOnce sync.Once + file_transport_internet_splithttp_config_proto_rawDescData = file_transport_internet_splithttp_config_proto_rawDesc +) + +func file_transport_internet_splithttp_config_proto_rawDescGZIP() []byte { + file_transport_internet_splithttp_config_proto_rawDescOnce.Do(func() { + file_transport_internet_splithttp_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_transport_internet_splithttp_config_proto_rawDescData) + }) + return file_transport_internet_splithttp_config_proto_rawDescData +} + +var file_transport_internet_splithttp_config_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_transport_internet_splithttp_config_proto_goTypes = []interface{}{ + (*Config)(nil), // 0: xray.transport.internet.splithttp.Config + nil, // 1: xray.transport.internet.splithttp.Config.HeaderEntry +} +var file_transport_internet_splithttp_config_proto_depIdxs = []int32{ + 1, // 0: xray.transport.internet.splithttp.Config.header:type_name -> xray.transport.internet.splithttp.Config.HeaderEntry + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_transport_internet_splithttp_config_proto_init() } +func file_transport_internet_splithttp_config_proto_init() { + if File_transport_internet_splithttp_config_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_transport_internet_splithttp_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Config); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_transport_internet_splithttp_config_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_transport_internet_splithttp_config_proto_goTypes, + DependencyIndexes: file_transport_internet_splithttp_config_proto_depIdxs, + MessageInfos: file_transport_internet_splithttp_config_proto_msgTypes, + }.Build() + File_transport_internet_splithttp_config_proto = out.File + file_transport_internet_splithttp_config_proto_rawDesc = nil + file_transport_internet_splithttp_config_proto_goTypes = nil + file_transport_internet_splithttp_config_proto_depIdxs = nil +} diff --git a/transport/internet/splithttp/config.proto b/transport/internet/splithttp/config.proto new file mode 100644 index 00000000..43cecee2 --- /dev/null +++ b/transport/internet/splithttp/config.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package xray.transport.internet.splithttp; +option csharp_namespace = "Xray.Transport.Internet.SplitHttp"; +option go_package = "github.com/xtls/xray-core/transport/internet/splithttp"; +option java_package = "com.xray.transport.internet.splithttp"; +option java_multiple_files = true; + +message Config { + string host = 1; + string path = 2; + map header = 3; + int32 maxConcurrentUploads = 4; + int32 maxUploadSize = 5; +} diff --git a/transport/internet/splithttp/connection.go b/transport/internet/splithttp/connection.go new file mode 100644 index 00000000..697381d4 --- /dev/null +++ b/transport/internet/splithttp/connection.go @@ -0,0 +1,59 @@ +package splithttp + +import ( + "io" + "net" + "time" +) + +type splitConn struct { + writer io.WriteCloser + reader io.ReadCloser + remoteAddr net.Addr + localAddr net.Addr +} + +func (c *splitConn) Write(b []byte) (int, error) { + return c.writer.Write(b) +} + +func (c *splitConn) Read(b []byte) (int, error) { + return c.reader.Read(b) +} + +func (c *splitConn) Close() error { + err := c.writer.Close() + err2 := c.reader.Close() + if err != nil { + return err + } + + if err2 != nil { + return err + } + + return nil +} + +func (c *splitConn) LocalAddr() net.Addr { + return c.localAddr +} + +func (c *splitConn) RemoteAddr() net.Addr { + return c.remoteAddr +} + +func (c *splitConn) SetDeadline(t time.Time) error { + // TODO cannot do anything useful + return nil +} + +func (c *splitConn) SetReadDeadline(t time.Time) error { + // TODO cannot do anything useful + return nil +} + +func (c *splitConn) SetWriteDeadline(t time.Time) error { + // TODO cannot do anything useful + return nil +} diff --git a/transport/internet/splithttp/dialer.go b/transport/internet/splithttp/dialer.go new file mode 100644 index 00000000..b763981a --- /dev/null +++ b/transport/internet/splithttp/dialer.go @@ -0,0 +1,316 @@ +package splithttp + +import ( + "context" + gotls "crypto/tls" + "io" + gonet "net" + "net/http" + "net/http/httptrace" + "net/url" + "strconv" + "sync" + "time" + + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/common/signal/semaphore" + "github.com/xtls/xray-core/common/uuid" + "github.com/xtls/xray-core/transport/internet" + "github.com/xtls/xray-core/transport/internet/stat" + "github.com/xtls/xray-core/transport/internet/tls" + "github.com/xtls/xray-core/transport/pipe" + "golang.org/x/net/http2" +) + +type dialerConf struct { + net.Destination + *internet.MemoryStreamConfig +} + +type reusedClient struct { + download *http.Client + upload *http.Client + isH2 bool + // pool of net.Conn, created using dialUploadConn + uploadRawPool *sync.Pool + dialUploadConn func(ctxInner context.Context) (net.Conn, error) +} + +var ( + globalDialerMap map[dialerConf]reusedClient + globalDialerAccess sync.Mutex +) + +func destroyHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) { + globalDialerAccess.Lock() + defer globalDialerAccess.Unlock() + + if globalDialerMap == nil { + globalDialerMap = make(map[dialerConf]reusedClient) + } + + delete(globalDialerMap, dialerConf{dest, streamSettings}) + +} + +func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) reusedClient { + globalDialerAccess.Lock() + defer globalDialerAccess.Unlock() + + if globalDialerMap == nil { + globalDialerMap = make(map[dialerConf]reusedClient) + } + + if client, found := globalDialerMap[dialerConf{dest, streamSettings}]; found { + return client + } + + tlsConfig := tls.ConfigFromStreamSettings(streamSettings) + + var gotlsConfig *gotls.Config + + if tlsConfig != nil { + gotlsConfig = tlsConfig.GetTLSConfig(tls.WithDestination(dest)) + } + + dialContext := func(ctxInner context.Context) (net.Conn, error) { + conn, err := internet.DialSystem(ctx, dest, streamSettings.SocketSettings) + if err != nil { + return nil, err + } + + if gotlsConfig != nil { + if fingerprint := tls.GetFingerprint(tlsConfig.Fingerprint); fingerprint != nil { + conn = tls.UClient(conn, gotlsConfig, fingerprint) + if err := conn.(*tls.UConn).HandshakeContext(ctx); err != nil { + return nil, err + } + } else { + conn = tls.Client(conn, gotlsConfig) + } + } + + return conn, nil + } + + var uploadTransport http.RoundTripper + var downloadTransport http.RoundTripper + + if tlsConfig != nil { + downloadTransport = &http2.Transport{ + DialTLSContext: func(ctxInner context.Context, network string, addr string, cfg *gotls.Config) (net.Conn, error) { + return dialContext(ctxInner) + }, + IdleConnTimeout: 90 * time.Second, + } + uploadTransport = downloadTransport + } else { + httpDialContext := func(ctxInner context.Context, network string, addr string) (net.Conn, error) { + return dialContext(ctxInner) + } + + downloadTransport = &http.Transport{ + DialTLSContext: httpDialContext, + DialContext: httpDialContext, + IdleConnTimeout: 90 * time.Second, + // chunked transfer download with keepalives is buggy with + // http.Client and our custom dial context. + DisableKeepAlives: true, + } + + // we use uploadRawPool for that + uploadTransport = nil + } + + client := reusedClient{ + download: &http.Client{ + Transport: downloadTransport, + }, + upload: &http.Client{ + Transport: uploadTransport, + }, + isH2: tlsConfig != nil, + uploadRawPool: &sync.Pool{}, + dialUploadConn: dialContext, + } + + globalDialerMap[dialerConf{dest, streamSettings}] = client + return client +} + +func init() { + common.Must(internet.RegisterTransportDialer(protocolName, Dial)) +} + +func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) { + newError("dialing splithttp to ", dest).WriteToLog(session.ExportIDToError(ctx)) + + var requestURL url.URL + + transportConfiguration := streamSettings.ProtocolSettings.(*Config) + tlsConfig := tls.ConfigFromStreamSettings(streamSettings) + + maxConcurrentUploads := transportConfiguration.GetNormalizedMaxConcurrentUploads() + maxUploadSize := transportConfiguration.GetNormalizedMaxUploadSize() + + if tlsConfig != nil { + requestURL.Scheme = "https" + } else { + requestURL.Scheme = "http" + } + requestURL.Host = transportConfiguration.Host + if requestURL.Host == "" { + requestURL.Host = dest.NetAddr() + } + requestURL.Path = transportConfiguration.GetNormalizedPath() + + httpClient := getHTTPClient(ctx, dest, streamSettings) + + var remoteAddr gonet.Addr + var localAddr gonet.Addr + + trace := &httptrace.ClientTrace{ + GotConn: func(connInfo httptrace.GotConnInfo) { + remoteAddr = connInfo.Conn.RemoteAddr() + localAddr = connInfo.Conn.LocalAddr() + }, + } + + sessionIdUuid := uuid.New() + sessionId := sessionIdUuid.String() + + req, err := http.NewRequestWithContext( + httptrace.WithClientTrace(ctx, trace), + "GET", + requestURL.String()+"?session="+sessionId, + nil, + ) + if err != nil { + return nil, err + } + + req.Header = transportConfiguration.GetRequestHeader() + + downResponse, err := httpClient.download.Do(req) + if err != nil { + // workaround for various connection pool related issues, mostly around + // HTTP/1.1. if the http client ever fails to send a request, we simply + // delete it entirely. + // in HTTP/1.1, it was observed that pool connections would immediately + // fail with "context canceled" if the previous http response body was + // not explicitly BOTH drained and closed. at the same time, sometimes + // the draining itself takes forever and causes more problems. + // see also https://github.com/golang/go/issues/60240 + destroyHTTPClient(ctx, dest, streamSettings) + return nil, newError("failed to send download http request, destroying client").Base(err) + } + + if downResponse.StatusCode != 200 { + downResponse.Body.Close() + return nil, newError("invalid status code on download:", downResponse.Status) + } + + uploadUrl := requestURL.String() + "?session=" + sessionId + "&seq=" + + uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize)) + + go func() { + requestsLimiter := semaphore.New(int(maxConcurrentUploads)) + var requestCounter int64 + + // by offloading the uploads into a buffered pipe, multiple conn.Write + // calls get automatically batched together into larger POST requests. + // without batching, bandwidth is extremely limited. + for { + chunk, err := uploadPipeReader.ReadMultiBuffer() + if err != nil { + break + } + + <-requestsLimiter.Wait() + + url := uploadUrl + strconv.FormatInt(requestCounter, 10) + requestCounter += 1 + + go func() { + defer requestsLimiter.Signal() + req, err := http.NewRequest("POST", url, &buf.MultiBufferContainer{MultiBuffer: chunk}) + if err != nil { + newError("failed to send upload").Base(err).WriteToLog() + uploadPipeReader.Interrupt() + return + } + + req.Header = transportConfiguration.GetRequestHeader() + + if httpClient.isH2 { + resp, err := httpClient.upload.Do(req) + if err != nil { + newError("failed to send upload").Base(err).WriteToLog() + uploadPipeReader.Interrupt() + return + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + newError("failed to send upload, bad status code:", resp.Status).WriteToLog() + uploadPipeReader.Interrupt() + return + } + } else { + var err error + var uploadConn any + for _ = range 5 { + uploadConn = httpClient.uploadRawPool.Get() + if uploadConn == nil { + uploadConn, err = httpClient.dialUploadConn(ctx) + if err != nil { + newError("failed to connect upload").Base(err).WriteToLog() + uploadPipeReader.Interrupt() + return + } + } + + err = req.Write(uploadConn.(net.Conn)) + if err == nil { + break + } + } + + if err != nil { + newError("failed to send upload").Base(err).WriteToLog() + uploadPipeReader.Interrupt() + return + } + + httpClient.uploadRawPool.Put(uploadConn) + } + }() + + } + }() + + // skip "ok" response + trashHeader := []byte{0, 0} + _, err = io.ReadFull(downResponse.Body, trashHeader) + if err != nil { + downResponse.Body.Close() + return nil, newError("failed to read initial response") + } + + // necessary in order to send larger chunks in upload + bufferedUploadPipeWriter := buf.NewBufferedWriter(uploadPipeWriter) + bufferedUploadPipeWriter.SetBuffered(false) + + conn := splitConn{ + writer: bufferedUploadPipeWriter, + reader: downResponse.Body, + remoteAddr: remoteAddr, + localAddr: localAddr, + } + + return stat.Connection(&conn), nil +} diff --git a/transport/internet/splithttp/errors.generated.go b/transport/internet/splithttp/errors.generated.go new file mode 100644 index 00000000..30fca9f6 --- /dev/null +++ b/transport/internet/splithttp/errors.generated.go @@ -0,0 +1,9 @@ +package splithttp + +import "github.com/xtls/xray-core/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +} diff --git a/transport/internet/splithttp/hub.go b/transport/internet/splithttp/hub.go new file mode 100644 index 00000000..1883bf23 --- /dev/null +++ b/transport/internet/splithttp/hub.go @@ -0,0 +1,262 @@ +package splithttp + +import ( + "context" + "crypto/tls" + "io" + gonet "net" + "net/http" + "strconv" + "sync" + "time" + + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/net" + http_proto "github.com/xtls/xray-core/common/protocol/http" + "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/common/signal/done" + "github.com/xtls/xray-core/transport/internet" + "github.com/xtls/xray-core/transport/internet/stat" + v2tls "github.com/xtls/xray-core/transport/internet/tls" +) + +type requestHandler struct { + host string + path string + ln *Listener + sessions sync.Map + localAddr gonet.TCPAddr +} + +func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + if len(h.host) > 0 && request.Host != h.host { + newError("failed to validate host, request:", request.Host, ", config:", h.host).WriteToLog() + writer.WriteHeader(http.StatusNotFound) + return + } + if request.URL.Path != h.path { + newError("failed to validate path, request:", request.URL.Path, ", config:", h.path).WriteToLog() + writer.WriteHeader(http.StatusNotFound) + return + } + + queryString := request.URL.Query() + sessionId := queryString.Get("session") + if sessionId == "" { + newError("no sessionid on request:", request.URL.Path).WriteToLog() + writer.WriteHeader(http.StatusBadRequest) + return + } + + forwardedAddrs := http_proto.ParseXForwardedFor(request.Header) + remoteAddr, err := gonet.ResolveTCPAddr("tcp", request.RemoteAddr) + if err != nil { + remoteAddr = &gonet.TCPAddr{} + } + if len(forwardedAddrs) > 0 && forwardedAddrs[0].Family().IsIP() { + remoteAddr = &net.TCPAddr{ + IP: forwardedAddrs[0].IP(), + Port: int(0), + } + } + + if request.Method == "POST" { + uploadQueue, ok := h.sessions.Load(sessionId) + if !ok { + newError("sessionid does not exist").WriteToLog() + writer.WriteHeader(http.StatusBadRequest) + return + } + + seq := queryString.Get("seq") + if seq == "" { + newError("no seq on request:", request.URL.Path).WriteToLog() + writer.WriteHeader(http.StatusBadRequest) + return + } + + payload, err := io.ReadAll(request.Body) + if err != nil { + newError("failed to upload").Base(err).WriteToLog() + writer.WriteHeader(http.StatusInternalServerError) + return + } + + seqInt, err := strconv.ParseUint(seq, 10, 64) + if err != nil { + newError("failed to upload").Base(err).WriteToLog() + writer.WriteHeader(http.StatusInternalServerError) + return + } + + err = uploadQueue.(*UploadQueue).Push(Packet{ + Payload: payload, + Seq: seqInt, + }) + + if err != nil { + newError("failed to upload").Base(err).WriteToLog() + writer.WriteHeader(http.StatusInternalServerError) + return + } + + writer.WriteHeader(http.StatusOK) + } else if request.Method == "GET" { + responseFlusher, ok := writer.(http.Flusher) + if !ok { + panic("expected http.ResponseWriter to be an http.Flusher") + } + + uploadQueue := NewUploadQueue(int(2 * h.ln.config.GetNormalizedMaxConcurrentUploads())) + + h.sessions.Store(sessionId, uploadQueue) + // the connection is finished, clean up map + defer h.sessions.Delete(sessionId) + + // magic header instructs nginx + apache to not buffer response body + writer.Header().Set("X-Accel-Buffering", "no") + writer.WriteHeader(http.StatusOK) + // send a chunk immediately to enable CDN streaming. + // many CDN buffer the response headers until the origin starts sending + // the body, with no way to turn it off. + writer.Write([]byte("ok")) + responseFlusher.Flush() + + downloadDone := done.New() + + conn := splitConn{ + writer: &httpResponseBodyWriter{ + responseWriter: writer, + downloadDone: downloadDone, + responseFlusher: responseFlusher, + }, + reader: uploadQueue, + remoteAddr: remoteAddr, + } + + h.ln.addConn(stat.Connection(&conn)) + + // "A ResponseWriter may not be used after [Handler.ServeHTTP] has returned." + <-downloadDone.Wait() + + } else { + writer.WriteHeader(http.StatusMethodNotAllowed) + } +} + +type httpResponseBodyWriter struct { + sync.Mutex + responseWriter http.ResponseWriter + responseFlusher http.Flusher + downloadDone *done.Instance +} + +func (c *httpResponseBodyWriter) Write(b []byte) (int, error) { + c.Lock() + defer c.Unlock() + if c.downloadDone.Done() { + return 0, io.ErrClosedPipe + } + n, err := c.responseWriter.Write(b) + if err == nil { + c.responseFlusher.Flush() + } + return n, err +} + +func (c *httpResponseBodyWriter) Close() error { + c.Lock() + defer c.Unlock() + c.downloadDone.Close() + return nil +} + +type Listener struct { + sync.Mutex + server http.Server + listener net.Listener + config *Config + addConn internet.ConnHandler +} + +func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) { + l := &Listener{ + addConn: addConn, + } + shSettings := streamSettings.ProtocolSettings.(*Config) + l.config = shSettings + if l.config != nil { + if streamSettings.SocketSettings == nil { + streamSettings.SocketSettings = &internet.SocketConfig{} + } + } + var listener net.Listener + var err error + var localAddr = gonet.TCPAddr{} + + if port == net.Port(0) { // unix + listener, err = internet.ListenSystem(ctx, &net.UnixAddr{ + Name: address.Domain(), + Net: "unix", + }, streamSettings.SocketSettings) + if err != nil { + return nil, newError("failed to listen unix domain socket(for SH) on ", address).Base(err) + } + newError("listening unix domain socket(for SH) on ", address).WriteToLog(session.ExportIDToError(ctx)) + } else { // tcp + localAddr = gonet.TCPAddr{ + IP: address.IP(), + Port: int(port), + } + listener, err = internet.ListenSystem(ctx, &net.TCPAddr{ + IP: address.IP(), + Port: int(port), + }, streamSettings.SocketSettings) + if err != nil { + return nil, newError("failed to listen TCP(for SH) on ", address, ":", port).Base(err) + } + newError("listening TCP(for SH) on ", address, ":", port).WriteToLog(session.ExportIDToError(ctx)) + } + + if config := v2tls.ConfigFromStreamSettings(streamSettings); config != nil { + if tlsConfig := config.GetTLSConfig(); tlsConfig != nil { + listener = tls.NewListener(listener, tlsConfig) + } + } + + l.listener = listener + + l.server = http.Server{ + Handler: &requestHandler{ + host: shSettings.Host, + path: shSettings.GetNormalizedPath(), + ln: l, + sessions: sync.Map{}, + localAddr: localAddr, + }, + ReadHeaderTimeout: time.Second * 4, + MaxHeaderBytes: 8192, + } + + go func() { + if err := l.server.Serve(l.listener); err != nil { + newError("failed to serve http for splithttp").Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx)) + } + }() + + return l, err +} + +// Addr implements net.Listener.Addr(). +func (ln *Listener) Addr() net.Addr { + return ln.listener.Addr() +} + +// Close implements net.Listener.Close(). +func (ln *Listener) Close() error { + return ln.listener.Close() +} + +func init() { + common.Must(internet.RegisterTransportListener(protocolName, ListenSH)) +} diff --git a/transport/internet/splithttp/splithttp.go b/transport/internet/splithttp/splithttp.go new file mode 100644 index 00000000..55d230df --- /dev/null +++ b/transport/internet/splithttp/splithttp.go @@ -0,0 +1,17 @@ +package splithttp + +import ( + "context" + + "github.com/xtls/xray-core/common" +) + +//go:generate go run github.com/xtls/xray-core/common/errors/errorgen + +const protocolName = "splithttp" + +func init() { + common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { + return nil, newError("splithttp is a transport protocol.") + })) +} diff --git a/transport/internet/splithttp/splithttp_test.go b/transport/internet/splithttp/splithttp_test.go new file mode 100644 index 00000000..3d3d387c --- /dev/null +++ b/transport/internet/splithttp/splithttp_test.go @@ -0,0 +1,154 @@ +package splithttp_test + +import ( + "context" + "fmt" + "runtime" + "testing" + "time" + + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/protocol/tls/cert" + "github.com/xtls/xray-core/testing/servers/tcp" + "github.com/xtls/xray-core/transport/internet" + . "github.com/xtls/xray-core/transport/internet/splithttp" + "github.com/xtls/xray-core/transport/internet/stat" + "github.com/xtls/xray-core/transport/internet/tls" +) + +func Test_listenSHAndDial(t *testing.T) { + listenPort := tcp.PickPort() + listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, &internet.MemoryStreamConfig{ + ProtocolName: "splithttp", + ProtocolSettings: &Config{ + Path: "/sh", + }, + }, func(conn stat.Connection) { + go func(c stat.Connection) { + defer c.Close() + + var b [1024]byte + _, err := c.Read(b[:]) + if err != nil { + return + } + + common.Must2(c.Write([]byte("Response"))) + }(conn) + }) + common.Must(err) + ctx := context.Background() + streamSettings := &internet.MemoryStreamConfig{ + ProtocolName: "splithttp", + ProtocolSettings: &Config{Path: "sh"}, + } + conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings) + + common.Must(err) + _, err = conn.Write([]byte("Test connection 1")) + common.Must(err) + + var b [1024]byte + fmt.Println("test2") + n, _ := conn.Read(b[:]) + fmt.Println("string is", n) + if string(b[:n]) != "Response" { + t.Error("response: ", string(b[:n])) + } + + common.Must(conn.Close()) + <-time.After(time.Second * 5) + conn, err = Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings) + common.Must(err) + _, err = conn.Write([]byte("Test connection 2")) + common.Must(err) + n, _ = conn.Read(b[:]) + common.Must(err) + if string(b[:n]) != "Response" { + t.Error("response: ", string(b[:n])) + } + common.Must(conn.Close()) + + common.Must(listen.Close()) +} + +func TestDialWithRemoteAddr(t *testing.T) { + listenPort := tcp.PickPort() + listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, &internet.MemoryStreamConfig{ + ProtocolName: "splithttp", + ProtocolSettings: &Config{ + Path: "sh", + }, + }, func(conn stat.Connection) { + go func(c stat.Connection) { + defer c.Close() + + var b [1024]byte + _, err := c.Read(b[:]) + // common.Must(err) + if err != nil { + return + } + + _, err = c.Write([]byte("Response")) + common.Must(err) + }(conn) + }) + common.Must(err) + + conn, err := Dial(context.Background(), net.TCPDestination(net.DomainAddress("localhost"), listenPort), &internet.MemoryStreamConfig{ + ProtocolName: "splithttp", + ProtocolSettings: &Config{Path: "sh", Header: map[string]string{"X-Forwarded-For": "1.1.1.1"}}, + }) + + common.Must(err) + _, err = conn.Write([]byte("Test connection 1")) + common.Must(err) + + var b [1024]byte + n, _ := conn.Read(b[:]) + if string(b[:n]) != "Response" { + t.Error("response: ", string(b[:n])) + } + + common.Must(listen.Close()) +} + +func Test_listenSHAndDial_TLS(t *testing.T) { + if runtime.GOARCH == "arm64" { + return + } + + listenPort := tcp.PickPort() + + start := time.Now() + + streamSettings := &internet.MemoryStreamConfig{ + ProtocolName: "splithttp", + ProtocolSettings: &Config{ + Path: "shs", + }, + SecurityType: "tls", + SecuritySettings: &tls.Config{ + AllowInsecure: true, + Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.CommonName("localhost")))}, + }, + } + listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, streamSettings, func(conn stat.Connection) { + go func() { + _ = conn.Close() + }() + }) + common.Must(err) + defer listen.Close() + + conn, err := Dial(context.Background(), net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings) + common.Must(err) + _ = conn.Close() + + end := time.Now() + if !end.Before(start.Add(time.Second * 5)) { + t.Error("end: ", end, " start: ", start) + } +} diff --git a/transport/internet/splithttp/upload_queue.go b/transport/internet/splithttp/upload_queue.go new file mode 100644 index 00000000..8b2c8f36 --- /dev/null +++ b/transport/internet/splithttp/upload_queue.go @@ -0,0 +1,120 @@ +package splithttp + +// upload_queue is a specialized priorityqueue + channel to reorder generic +// packets by a sequence number + +import ( + "container/heap" + "io" +) + +type Packet struct { + Payload []byte + Seq uint64 +} + +type UploadQueue struct { + pushedPackets chan Packet + heap uploadHeap + nextSeq uint64 + closed bool + maxPackets int +} + +func NewUploadQueue(maxPackets int) *UploadQueue { + return &UploadQueue{ + pushedPackets: make(chan Packet, maxPackets), + heap: uploadHeap{}, + nextSeq: 0, + closed: false, + maxPackets: maxPackets, + } +} + +func (h *UploadQueue) Push(p Packet) error { + if h.closed { + return newError("splithttp packet queue closed") + } + + h.pushedPackets <- p + return nil +} + +func (h *UploadQueue) Close() error { + h.closed = true + close(h.pushedPackets) + return nil +} + +func (h *UploadQueue) Read(b []byte) (int, error) { + if h.closed && len(h.heap) == 0 && len(h.pushedPackets) == 0 { + return 0, io.EOF + } + + needMorePackets := false + + if len(h.heap) > 0 { + packet := heap.Pop(&h.heap).(Packet) + n := 0 + + if packet.Seq == h.nextSeq { + copy(b, packet.Payload) + n = min(len(b), len(packet.Payload)) + + if n < len(packet.Payload) { + // partial read + packet.Payload = packet.Payload[n:] + heap.Push(&h.heap, packet) + } else { + h.nextSeq = packet.Seq + 1 + } + + return n, nil + } + + // misordered packet + if packet.Seq > h.nextSeq { + if len(h.heap) > h.maxPackets { + // the "reassembly buffer" is too large, and we want to + // constrain memory usage somehow. let's tear down the + // connection, and hope the application retries. + return 0, newError("packet queue is too large") + } + heap.Push(&h.heap, packet) + needMorePackets = true + } + } else { + needMorePackets = true + } + + if needMorePackets { + packet, more := <-h.pushedPackets + if !more { + return 0, io.EOF + } + heap.Push(&h.heap, packet) + } + + return 0, nil +} + +// heap code directly taken from https://pkg.go.dev/container/heap +type uploadHeap []Packet + +func (h uploadHeap) Len() int { return len(h) } +func (h uploadHeap) Less(i, j int) bool { return h[i].Seq < h[j].Seq } +func (h uploadHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *uploadHeap) Push(x any) { + // Push and Pop use pointer receivers because they modify the slice's length, + // not just its contents. + *h = append(*h, x.(Packet)) +} + +func (h *uploadHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +}