Experiment: pingpong seed

This commit is contained in:
yuhan6665 2024-09-21 22:02:08 -04:00
parent 80ce4a20f6
commit 7743a2b01f
11 changed files with 158 additions and 301 deletions

View file

@ -11,8 +11,8 @@ import (
type dataHandler func(MultiBuffer) type dataHandler func(MultiBuffer)
type copyHandler struct { type CopyHandler struct {
onData []dataHandler OnData []dataHandler
} }
// SizeCounter is for counting bytes copied by Copy(). // SizeCounter is for counting bytes copied by Copy().
@ -21,12 +21,12 @@ type SizeCounter struct {
} }
// CopyOption is an option for copying data. // CopyOption is an option for copying data.
type CopyOption func(*copyHandler) type CopyOption func(*CopyHandler)
// UpdateActivity is a CopyOption to update activity on each data copy operation. // UpdateActivity is a CopyOption to update activity on each data copy operation.
func UpdateActivity(timer signal.ActivityUpdater) CopyOption { func UpdateActivity(timer signal.ActivityUpdater) CopyOption {
return func(handler *copyHandler) { return func(handler *CopyHandler) {
handler.onData = append(handler.onData, func(MultiBuffer) { handler.OnData = append(handler.OnData, func(MultiBuffer) {
timer.Update() timer.Update()
}) })
} }
@ -34,8 +34,8 @@ func UpdateActivity(timer signal.ActivityUpdater) CopyOption {
// CountSize is a CopyOption that sums the total size of data copied into the given SizeCounter. // CountSize is a CopyOption that sums the total size of data copied into the given SizeCounter.
func CountSize(sc *SizeCounter) CopyOption { func CountSize(sc *SizeCounter) CopyOption {
return func(handler *copyHandler) { return func(handler *CopyHandler) {
handler.onData = append(handler.onData, func(b MultiBuffer) { handler.OnData = append(handler.OnData, func(b MultiBuffer) {
sc.Size += int64(b.Len()) sc.Size += int64(b.Len())
}) })
} }
@ -43,8 +43,8 @@ func CountSize(sc *SizeCounter) CopyOption {
// AddToStatCounter a CopyOption add to stat counter // AddToStatCounter a CopyOption add to stat counter
func AddToStatCounter(sc stats.Counter) CopyOption { func AddToStatCounter(sc stats.Counter) CopyOption {
return func(handler *copyHandler) { return func(handler *CopyHandler) {
handler.onData = append(handler.onData, func(b MultiBuffer) { handler.OnData = append(handler.OnData, func(b MultiBuffer) {
if sc != nil { if sc != nil {
sc.Add(int64(b.Len())) sc.Add(int64(b.Len()))
} }
@ -88,18 +88,17 @@ func IsWriteError(err error) bool {
return ok return ok
} }
func copyInternal(reader Reader, writer Writer, handler *copyHandler) error { func copyInternal(reader Reader, writer Writer, handler *CopyHandler) error {
for { for {
buffer, err := reader.ReadMultiBuffer() buffer, err := reader.ReadMultiBuffer()
if !buffer.IsEmpty() { if !buffer.IsEmpty() {
for _, handler := range handler.onData {
handler(buffer)
}
if werr := writer.WriteMultiBuffer(buffer); werr != nil { if werr := writer.WriteMultiBuffer(buffer); werr != nil {
return writeError{werr} return writeError{werr}
} }
} }
for _, handler := range handler.OnData {
handler(buffer)
}
if err != nil { if err != nil {
return readError{err} return readError{err}
@ -109,7 +108,7 @@ func copyInternal(reader Reader, writer Writer, handler *copyHandler) error {
// Copy dumps all payload from reader to writer or stops when an error occurs. It returns nil when EOF. // Copy dumps all payload from reader to writer or stops when an error occurs. It returns nil when EOF.
func Copy(reader Reader, writer Writer, options ...CopyOption) error { func Copy(reader Reader, writer Writer, options ...CopyOption) error {
var handler copyHandler var handler CopyHandler
for _, option := range options { for _, option := range options {
option(&handler) option(&handler)
} }

View file

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.33.0 // protoc-gen-go v1.34.2
// protoc v4.23.1 // protoc v5.27.0
// source: proxy/addons.proto // source: proxy/addons.proto
package proxy package proxy
@ -307,6 +307,7 @@ type SchedulerConfig struct {
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
TimeoutMillis uint32 `protobuf:"varint,1,opt,name=TimeoutMillis,proto3" json:"TimeoutMillis,omitempty"` // original traffic will not be sent right away but when scheduler want to send or pending buffer times out TimeoutMillis uint32 `protobuf:"varint,1,opt,name=TimeoutMillis,proto3" json:"TimeoutMillis,omitempty"` // original traffic will not be sent right away but when scheduler want to send or pending buffer times out
PingPong bool `protobuf:"varint,2,opt,name=PingPong,proto3" json:"PingPong,omitempty"` // Other TBD
} }
func (x *SchedulerConfig) Reset() { func (x *SchedulerConfig) Reset() {
@ -348,6 +349,13 @@ func (x *SchedulerConfig) GetTimeoutMillis() uint32 {
return 0 return 0
} }
func (x *SchedulerConfig) GetPingPong() bool {
if x != nil {
return x.PingPong
}
return false
}
var File_proxy_addons_proto protoreflect.FileDescriptor var File_proxy_addons_proto protoreflect.FileDescriptor
var file_proxy_addons_proto_rawDesc = []byte{ var file_proxy_addons_proto_rawDesc = []byte{
@ -385,21 +393,22 @@ var file_proxy_addons_proto_rawDesc = []byte{
0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x4d, 0x69, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x4d, 0x69,
0x6e, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x4d, 0x61, 0x78, 0x4d, 0x69, 0x6e, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x4d, 0x61, 0x78, 0x4d, 0x69,
0x6c, 0x6c, 0x69, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x4d, 0x61, 0x78, 0x4d, 0x6c, 0x6c, 0x69, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x4d, 0x61, 0x78, 0x4d,
0x69, 0x6c, 0x6c, 0x69, 0x73, 0x22, 0x37, 0x0a, 0x0f, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x22, 0x53, 0x0a, 0x0f, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c,
0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x24, 0x0a, 0x0d, 0x54, 0x69, 0x6d, 0x65, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x24, 0x0a, 0x0d, 0x54, 0x69, 0x6d, 0x65,
0x6f, 0x75, 0x74, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x6f, 0x75, 0x74, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52,
0x0d, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x2a, 0x58, 0x0d, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x12, 0x1a,
0x0a, 0x08, 0x53, 0x65, 0x65, 0x64, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x0a, 0x08, 0x50, 0x69, 0x6e, 0x67, 0x50, 0x6f, 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08,
0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x50, 0x61, 0x64, 0x64, 0x69, 0x52, 0x08, 0x50, 0x69, 0x6e, 0x67, 0x50, 0x6f, 0x6e, 0x67, 0x2a, 0x58, 0x0a, 0x08, 0x53, 0x65,
0x6e, 0x67, 0x4f, 0x6e, 0x6c, 0x79, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x50, 0x61, 0x64, 0x64, 0x65, 0x64, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77,
0x69, 0x6e, 0x67, 0x50, 0x6c, 0x75, 0x73, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x10, 0x02, 0x12, 0x18, 0x6e, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x50, 0x61, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x4f, 0x6e,
0x0a, 0x14, 0x49, 0x6e, 0x64, 0x65, 0x70, 0x65, 0x6e, 0x64, 0x65, 0x6e, 0x74, 0x53, 0x63, 0x68, 0x6c, 0x79, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x50, 0x61, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x50,
0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x10, 0x03, 0x42, 0x40, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x6c, 0x75, 0x73, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x10, 0x02, 0x12, 0x18, 0x0a, 0x14, 0x49, 0x6e,
0x78, 0x72, 0x61, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x50, 0x01, 0x5a, 0x1f, 0x67, 0x69, 0x64, 0x65, 0x70, 0x65, 0x6e, 0x64, 0x65, 0x6e, 0x74, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x65, 0x72, 0x10, 0x03, 0x42, 0x40, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79,
0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x78, 0x79, 0xaa, 0x02, 0x0a, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x50, 0x01, 0x5a, 0x1f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
0x58, 0x72, 0x61, 0x79, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63,
0x6f, 0x33, 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x78, 0x79, 0xaa, 0x02, 0x0a, 0x58, 0x72, 0x61, 0x79,
0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
@ -416,7 +425,7 @@ func file_proxy_addons_proto_rawDescGZIP() []byte {
var file_proxy_addons_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_proxy_addons_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_proxy_addons_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_proxy_addons_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_proxy_addons_proto_goTypes = []interface{}{ var file_proxy_addons_proto_goTypes = []any{
(SeedMode)(0), // 0: xray.proxy.SeedMode (SeedMode)(0), // 0: xray.proxy.SeedMode
(*Addons)(nil), // 1: xray.proxy.Addons (*Addons)(nil), // 1: xray.proxy.Addons
(*PaddingConfig)(nil), // 2: xray.proxy.PaddingConfig (*PaddingConfig)(nil), // 2: xray.proxy.PaddingConfig
@ -441,7 +450,7 @@ func file_proxy_addons_proto_init() {
return return
} }
if !protoimpl.UnsafeEnabled { if !protoimpl.UnsafeEnabled {
file_proxy_addons_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { file_proxy_addons_proto_msgTypes[0].Exporter = func(v any, i int) any {
switch v := v.(*Addons); i { switch v := v.(*Addons); i {
case 0: case 0:
return &v.state return &v.state
@ -453,7 +462,7 @@ func file_proxy_addons_proto_init() {
return nil return nil
} }
} }
file_proxy_addons_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { file_proxy_addons_proto_msgTypes[1].Exporter = func(v any, i int) any {
switch v := v.(*PaddingConfig); i { switch v := v.(*PaddingConfig); i {
case 0: case 0:
return &v.state return &v.state
@ -465,7 +474,7 @@ func file_proxy_addons_proto_init() {
return nil return nil
} }
} }
file_proxy_addons_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { file_proxy_addons_proto_msgTypes[2].Exporter = func(v any, i int) any {
switch v := v.(*DelayConfig); i { switch v := v.(*DelayConfig); i {
case 0: case 0:
return &v.state return &v.state
@ -477,7 +486,7 @@ func file_proxy_addons_proto_init() {
return nil return nil
} }
} }
file_proxy_addons_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { file_proxy_addons_proto_msgTypes[3].Exporter = func(v any, i int) any {
switch v := v.(*SchedulerConfig); i { switch v := v.(*SchedulerConfig); i {
case 0: case 0:
return &v.state return &v.state

View file

@ -38,5 +38,6 @@ message DelayConfig {
message SchedulerConfig { message SchedulerConfig {
uint32 TimeoutMillis = 1; // original traffic will not be sent right away but when scheduler want to send or pending buffer times out uint32 TimeoutMillis = 1; // original traffic will not be sent right away but when scheduler want to send or pending buffer times out
bool PingPong = 2;
// Other TBD // Other TBD
} }

View file

@ -231,7 +231,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
inTimer = inbound.Timer inTimer = inbound.Timer
} }
if !isTLSConn(conn) { // it would be tls conn in special use case of MITM, we need to let link handle traffic if !isTLSConn(conn) { // it would be tls conn in special use case of MITM, we need to let link handle traffic
return proxy.CopyRawConnIfExist(ctx, conn, writeConn, link.Writer, timer, inTimer) return proxy.CopyRawConnIfExist(ctx, conn, writeConn, link.Writer, timer, inTimer, nil)
} }
} }
var reader buf.Reader var reader buf.Reader

View file

@ -216,7 +216,7 @@ type VisionWriter struct {
trafficState *TrafficState trafficState *TrafficState
ctx context.Context ctx context.Context
writeOnceUserUUID *[]byte writeOnceUserUUID *[]byte
scheduler *Scheduler Scheduler *Scheduler
} }
func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, context context.Context) *VisionWriter { func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, context context.Context) *VisionWriter {
@ -228,7 +228,7 @@ func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, cont
trafficState: state, trafficState: state,
ctx: context, ctx: context,
writeOnceUserUUID: &w, writeOnceUserUUID: &w,
scheduler: NewScheduler(writer, addon, state, &w, context), Scheduler: NewScheduler(writer, addon, state, &w, context),
} }
} }
@ -281,12 +281,24 @@ func (w *VisionWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
if w.trafficState.StartTime.IsZero() { if w.trafficState.StartTime.IsZero() {
w.trafficState.StartTime = time.Now() w.trafficState.StartTime = time.Now()
} }
w.scheduler.Buffer <- mb w.Scheduler.Buffer <- mb
if w.addons.Scheduler == nil { w.Scheduler.Trigger <- -1 // send all buffers if no independent scheduler
w.scheduler.Trigger <- -1 // send all buffers if w.addons.Scheduler != nil {
w.Scheduler.TimeoutLock.Lock()
w.Scheduler.TimeoutCounter++
w.Scheduler.TimeoutLock.Unlock()
go func() {
time.Sleep(time.Duration(w.addons.Scheduler.TimeoutMillis) * time.Millisecond)
w.Scheduler.TimeoutLock.Lock()
w.Scheduler.TimeoutCounter--
if w.Scheduler.TimeoutCounter == 0 {
w.Scheduler.Trigger <- 0 // send when the latest buffer timeout
}
w.Scheduler.TimeoutLock.Unlock()
}()
} }
if len(w.scheduler.Error) > 0 { if len(w.Scheduler.Error) > 0 {
return <-w.scheduler.Error return <-w.Scheduler.Error
} }
return nil return nil
} }
@ -518,7 +530,7 @@ func UnwrapRawConn(conn net.Conn) (net.Conn, stats.Counter, stats.Counter) {
// CopyRawConnIfExist use the most efficient copy method. // CopyRawConnIfExist use the most efficient copy method.
// - If caller don't want to turn on splice, do not pass in both reader conn and writer conn // - If caller don't want to turn on splice, do not pass in both reader conn and writer conn
// - writer are from *transport.Link // - writer are from *transport.Link
func CopyRawConnIfExist(ctx context.Context, readerConn net.Conn, writerConn net.Conn, writer buf.Writer, timer *signal.ActivityTimer, inTimer *signal.ActivityTimer) error { func CopyRawConnIfExist(ctx context.Context, readerConn net.Conn, writerConn net.Conn, writer buf.Writer, timer *signal.ActivityTimer, inTimer *signal.ActivityTimer, scheduler *Scheduler) error {
readerConn, readCounter, _ := UnwrapRawConn(readerConn) readerConn, readCounter, _ := UnwrapRawConn(readerConn)
writerConn, _, writeCounter := UnwrapRawConn(writerConn) writerConn, _, writeCounter := UnwrapRawConn(writerConn)
reader := buf.NewReader(readerConn) reader := buf.NewReader(readerConn)
@ -581,10 +593,13 @@ func CopyRawConnIfExist(ctx context.Context, readerConn net.Conn, writerConn net
if readCounter != nil { if readCounter != nil {
readCounter.Add(int64(buffer.Len())) readCounter.Add(int64(buffer.Len()))
} }
timer.Update()
if werr := writer.WriteMultiBuffer(buffer); werr != nil { if werr := writer.WriteMultiBuffer(buffer); werr != nil {
return werr return werr
} }
timer.Update()
}
if scheduler != nil {
scheduler.Trigger <- 2
} }
if err != nil { if err != nil {
return err return err

View file

@ -15,6 +15,8 @@ type Scheduler struct {
Buffer chan buf.MultiBuffer Buffer chan buf.MultiBuffer
Trigger chan int Trigger chan int
Error chan error Error chan error
TimeoutCounter int
TimeoutLock *sync.Mutex
closed chan int closed chan int
bufferReadLock *sync.Mutex bufferReadLock *sync.Mutex
writer buf.Writer writer buf.Writer
@ -24,11 +26,21 @@ type Scheduler struct {
ctx context.Context ctx context.Context
} }
func TriggerScheduler(scheduler *Scheduler) buf.CopyOption {
return func(handler *buf.CopyHandler) {
handler.OnData = append(handler.OnData, func(buf.MultiBuffer) {
scheduler.Trigger <- 2 // send fake buffer if no pending
})
}
}
func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, userUUID *[]byte, context context.Context) *Scheduler { func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, userUUID *[]byte, context context.Context) *Scheduler {
var s = Scheduler{ var s = Scheduler{
Buffer: make(chan buf.MultiBuffer, 100), Buffer: make(chan buf.MultiBuffer, 100),
Trigger: make(chan int), Trigger: make(chan int),
Error: make(chan error, 100), Error: make(chan error, 100),
TimeoutCounter: 0,
TimeoutLock: new(sync.Mutex),
closed: make(chan int), closed: make(chan int),
bufferReadLock: new(sync.Mutex), bufferReadLock: new(sync.Mutex),
writer: w, writer: w,
@ -37,11 +49,14 @@ func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, userUUID *[]
writeOnceUserUUID: userUUID, writeOnceUserUUID: userUUID,
ctx: context, ctx: context,
} }
return &s
}
func(s *Scheduler) Start() {
go s.mainLoop() go s.mainLoop()
if s.addons.Scheduler != nil { if s.addons.Scheduler != nil && !s.addons.Scheduler.PingPong {
go s.exampleIndependentScheduler() go s.exampleIndependentScheduler()
} }
return &s
} }
func(s *Scheduler) mainLoop() { func(s *Scheduler) mainLoop() {
@ -49,6 +64,12 @@ func(s *Scheduler) mainLoop() {
if len(s.closed) > 0 { if len(s.closed) > 0 {
return return
} }
if trigger == -1 && s.addons.Scheduler != nil {
continue
}
if trigger == 2 && (s.addons.Scheduler == nil || !s.addons.Scheduler.PingPong) {
continue
}
go func() { // each trigger has independent delay, trigger does not block go func() { // each trigger has independent delay, trigger does not block
var d = 0 * time.Millisecond var d = 0 * time.Millisecond
if s.addons.Delay != nil { if s.addons.Delay != nil {

View file

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"io" "io"
"strings"
"github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/errors"
@ -54,15 +55,6 @@ func DecodeHeaderAddons(buffer *buf.Buffer, reader io.Reader) (*proxy.Addons, er
return addons, nil return addons, nil
} }
// EncodeBodyAddons returns a Writer that auto-encrypt content written by caller.
func EncodeBodyAddons(writer buf.Writer, request *protocol.RequestHeader, addons *proxy.Addons, state *proxy.TrafficState, context context.Context) buf.Writer {
w := proxy.NewVisionWriter(writer, addons, state, context)
if request.Command == protocol.RequestCommandUDP {
return NewMultiLengthPacketWriter(w)
}
return w
}
// DecodeBodyAddons returns a Reader from which caller can fetch decrypted body. // DecodeBodyAddons returns a Reader from which caller can fetch decrypted body.
func DecodeBodyAddons(reader io.Reader, request *protocol.RequestHeader, addons *proxy.Addons, state *proxy.TrafficState, context context.Context) buf.Reader { func DecodeBodyAddons(reader io.Reader, request *protocol.RequestHeader, addons *proxy.Addons, state *proxy.TrafficState, context context.Context) buf.Reader {
r := proxy.NewVisionReader(buf.NewReader(reader), addons, state, context) r := proxy.NewVisionReader(buf.NewReader(reader), addons, state, context)
@ -181,7 +173,7 @@ func (r *LengthPacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
func PopulateSeed(seed string, addons *proxy.Addons) { func PopulateSeed(seed string, addons *proxy.Addons) {
if len(seed) > 0 { if len(seed) > 0 {
addons.Seed = []byte {1} // only turn on, more TBD addons.Seed = []byte {1} // only turn on, more TBD
addons.Mode = proxy.SeedMode_PaddingPlusDelay addons.Mode = proxy.SeedMode_IndependentScheduler
addons.Duration = "0-8" addons.Duration = "0-8"
addons.Padding = &proxy.PaddingConfig{ addons.Padding = &proxy.PaddingConfig{
RegularMin: 0, RegularMin: 0,
@ -196,6 +188,7 @@ func PopulateSeed(seed string, addons *proxy.Addons) {
// } // }
addons.Scheduler = &proxy.SchedulerConfig{ addons.Scheduler = &proxy.SchedulerConfig{
TimeoutMillis: 600, TimeoutMillis: 600,
PingPong: strings.Contains(seed, "pingpong"),
} }
} else if addons.Flow == vless.XRV { } else if addons.Flow == vless.XRV {
addons.Seed = []byte {1} // only turn on, more TBD addons.Seed = []byte {1} // only turn on, more TBD
@ -244,7 +237,8 @@ func CheckSeed(requestAddons *proxy.Addons, responseAddons *proxy.Addons) error
return errors.New("Delay of one is nil but the other is not nil") return errors.New("Delay of one is nil but the other is not nil")
} }
if requestAddons.Scheduler != nil && responseAddons.Scheduler != nil { if requestAddons.Scheduler != nil && responseAddons.Scheduler != nil {
if requestAddons.Scheduler.TimeoutMillis != responseAddons.Scheduler.TimeoutMillis { if requestAddons.Scheduler.TimeoutMillis != responseAddons.Scheduler.TimeoutMillis ||
requestAddons.Scheduler.PingPong != responseAddons.Scheduler.PingPong {
return errors.New("Scheduler not match") return errors.New("Scheduler not match")
} }
} else if requestAddons.Scheduler != nil || responseAddons.Scheduler != nil { } else if requestAddons.Scheduler != nil || responseAddons.Scheduler != nil {

View file

@ -172,7 +172,7 @@ func DecodeResponseHeader(reader io.Reader, request *protocol.RequestHeader) (*p
} }
// XtlsRead filter and read xtls protocol // XtlsRead filter and read xtls protocol
func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer, conn net.Conn, input *bytes.Reader, rawInput *bytes.Buffer, trafficState *proxy.TrafficState, ob *session.Outbound, ctx context.Context) error { func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer, scheduler *proxy.Scheduler, conn net.Conn, input *bytes.Reader, rawInput *bytes.Buffer, trafficState *proxy.TrafficState, ob *session.Outbound, ctx context.Context) error {
err := func() error { err := func() error {
for { for {
if trafficState.ReaderSwitchToDirectCopy { if trafficState.ReaderSwitchToDirectCopy {
@ -188,11 +188,10 @@ func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer,
ob.CanSpliceCopy = 1 ob.CanSpliceCopy = 1
} }
} }
return proxy.CopyRawConnIfExist(ctx, conn, writerConn, writer, timer, inTimer) return proxy.CopyRawConnIfExist(ctx, conn, writerConn, writer, timer, inTimer, scheduler)
} }
buffer, err := reader.ReadMultiBuffer() buffer, err := reader.ReadMultiBuffer()
if !buffer.IsEmpty() { if !buffer.IsEmpty() {
timer.Update()
if trafficState.ReaderSwitchToDirectCopy { if trafficState.ReaderSwitchToDirectCopy {
// XTLS Vision processes struct TLS Conn's input and rawInput // XTLS Vision processes struct TLS Conn's input and rawInput
if inputBuffer, err := buf.ReadFrom(input); err == nil { if inputBuffer, err := buf.ReadFrom(input); err == nil {
@ -209,7 +208,9 @@ func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer,
if werr := writer.WriteMultiBuffer(buffer); werr != nil { if werr := writer.WriteMultiBuffer(buffer); werr != nil {
return werr return werr
} }
timer.Update()
} }
scheduler.Trigger <- 2
if err != nil { if err != nil {
return err return err
} }

View file

@ -532,6 +532,12 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
serverReader := link.Reader // .(*pipe.Reader) serverReader := link.Reader // .(*pipe.Reader)
serverWriter := link.Writer // .(*pipe.Writer) serverWriter := link.Writer // .(*pipe.Writer)
trafficState := proxy.NewTrafficState(account.ID.Bytes(), account.Flow) trafficState := proxy.NewTrafficState(account.ID.Bytes(), account.Flow)
bufferWriter := buf.NewBufferedWriter(buf.NewWriter(connection))
var clientWriter buf.Writer
v := proxy.NewVisionWriter(bufferWriter, requestAddons, trafficState, ctx)
scheduler := v.Scheduler
clientWriter = v
postRequest := func() error { postRequest := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
@ -542,10 +548,10 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
if requestAddons.Flow == vless.XRV { if requestAddons.Flow == vless.XRV {
ctx1 := session.ContextWithInbound(ctx, nil) // TODO enable splice ctx1 := session.ContextWithInbound(ctx, nil) // TODO enable splice
err = encoding.XtlsRead(clientReader, serverWriter, timer, connection, input, rawInput, trafficState, nil, ctx1) err = encoding.XtlsRead(clientReader, serverWriter, timer, scheduler, connection, input, rawInput, trafficState, nil, ctx1)
} else { } else {
// from clientReader.ReadMultiBuffer to serverWriter.WriteMultiBuffer // from clientReader.ReadMultiBuffer to serverWriter.WriteMultiBuffer
err = buf.Copy(clientReader, serverWriter, buf.UpdateActivity(timer)) err = buf.Copy(clientReader, serverWriter, buf.UpdateActivity(timer), proxy.TriggerScheduler(scheduler))
} }
if err != nil { if err != nil {
@ -558,13 +564,14 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
getResponse := func() error { getResponse := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
bufferWriter := buf.NewBufferedWriter(buf.NewWriter(connection))
if err := encoding.EncodeResponseHeader(bufferWriter, request, responseAddons); err != nil { if err := encoding.EncodeResponseHeader(bufferWriter, request, responseAddons); err != nil {
return errors.New("failed to encode response header").Base(err).AtWarning() return errors.New("failed to encode response header").Base(err).AtWarning()
} }
// default: clientWriter := bufferWriter scheduler.Start()
clientWriter := encoding.EncodeBodyAddons(bufferWriter, request, responseAddons, trafficState, ctx) if request.Command == protocol.RequestCommandUDP {
clientWriter = encoding.NewMultiLengthPacketWriter(clientWriter)
}
multiBuffer, err1 := serverReader.ReadMultiBuffer() multiBuffer, err1 := serverReader.ReadMultiBuffer()
if err1 != nil { if err1 != nil {
return err1 // ... return err1 // ...

View file

@ -185,18 +185,23 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
request.Address = net.DomainAddress("v1.mux.cool") request.Address = net.DomainAddress("v1.mux.cool")
request.Port = net.Port(666) request.Port = net.Port(666)
} }
bufferWriter := buf.NewBufferedWriter(buf.NewWriter(conn))
var serverWriter buf.Writer
v := proxy.NewVisionWriter(bufferWriter, requestAddons, trafficState, ctx)
scheduler := v.Scheduler
serverWriter = v
postRequest := func() error { postRequest := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
bufferWriter := buf.NewBufferedWriter(buf.NewWriter(conn))
if err := encoding.EncodeRequestHeader(bufferWriter, request, requestAddons); err != nil { if err := encoding.EncodeRequestHeader(bufferWriter, request, requestAddons); err != nil {
return errors.New("failed to encode request header").Base(err).AtWarning() return errors.New("failed to encode request header").Base(err).AtWarning()
} }
// default: serverWriter := bufferWriter scheduler.Start()
serverWriter := encoding.EncodeBodyAddons(bufferWriter, request, requestAddons, trafficState, ctx) if request.Command == protocol.RequestCommandUDP {
if request.Command == protocol.RequestCommandMux && request.Port == 666 { serverWriter = encoding.NewMultiLengthPacketWriter(serverWriter)
} else if request.Command == protocol.RequestCommandMux && request.Port == 666 {
serverWriter = xudp.NewPacketWriter(serverWriter, target, xudp.GetGlobalID(ctx)) serverWriter = xudp.NewPacketWriter(serverWriter, target, xudp.GetGlobalID(ctx))
} }
timeoutReader, ok := clientReader.(buf.TimeoutReader) timeoutReader, ok := clientReader.(buf.TimeoutReader)
@ -222,6 +227,11 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
if err := bufferWriter.SetBuffered(false); err != nil { if err := bufferWriter.SetBuffered(false); err != nil {
return errors.New("failed to write A request payload").Base(err).AtWarning() return errors.New("failed to write A request payload").Base(err).AtWarning()
} }
if requestAddons.Scheduler != nil && requestAddons.Scheduler.PingPong {
go func() {
scheduler.Trigger <- 2 // client kickstart the pingpong!
}()
}
var err error var err error
if requestAddons.Flow == vless.XRV { if requestAddons.Flow == vless.XRV {
@ -266,10 +276,10 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
} }
if requestAddons.Flow == vless.XRV { if requestAddons.Flow == vless.XRV {
err = encoding.XtlsRead(serverReader, clientWriter, timer, conn, input, rawInput, trafficState, ob, ctx) err = encoding.XtlsRead(serverReader, clientWriter, timer, scheduler, conn, input, rawInput, trafficState, ob, ctx)
} else { } else {
// from serverReader.ReadMultiBuffer to clientWriter.WriteMultiBuffer // from serverReader.ReadMultiBuffer to clientWriter.WriteMultiBuffer
err = buf.Copy(serverReader, clientWriter, buf.UpdateActivity(timer)) err = buf.Copy(serverReader, clientWriter, buf.UpdateActivity(timer), proxy.TriggerScheduler(scheduler))
} }
if err != nil { if err != nil {

View file

@ -30,6 +30,18 @@ import (
) )
func TestVless(t *testing.T) { func TestVless(t *testing.T) {
testVlessSeed(t, "")
}
func TestVlessSeedWithFixedTrigger(t *testing.T) {
testVlessSeed(t, "1")
}
func TestVlessSeedWithPingPong(t *testing.T) {
testVlessSeed(t, "pingpong")
}
func testVlessSeed(t *testing.T, seed string) {
tcpServer := tcp.Server{ tcpServer := tcp.Server{
MsgProcessor: xor, MsgProcessor: xor,
} }
@ -57,6 +69,7 @@ func TestVless(t *testing.T) {
{ {
Account: serial.ToTypedMessage(&vless.Account{ Account: serial.ToTypedMessage(&vless.Account{
Id: userID.String(), Id: userID.String(),
Seed: seed,
}), }),
}, },
}, },
@ -102,106 +115,7 @@ func TestVless(t *testing.T) {
{ {
Account: serial.ToTypedMessage(&vless.Account{ Account: serial.ToTypedMessage(&vless.Account{
Id: userID.String(), Id: userID.String(),
}), Seed: seed,
},
},
},
},
}),
},
},
}
servers, err := InitializeServerConfigs(serverConfig, clientConfig)
common.Must(err)
defer CloseAllServers(servers)
var errg errgroup.Group
for i := 0; i < 10; i++ {
errg.Go(testTCPConn(clientPort, 1024*1024, time.Second*30))
}
if err := errg.Wait(); err != nil {
t.Error(err)
}
}
func TestVlessSeedWithIndependentScheduler(t *testing.T) {
tcpServer := tcp.Server{
MsgProcessor: xor,
}
dest, err := tcpServer.Start()
common.Must(err)
defer tcpServer.Close()
userID := protocol.NewID(uuid.New())
serverPort := tcp.PickPort()
serverConfig := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&log.Config{
ErrorLogLevel: clog.Severity_Debug,
ErrorLogType: log.LogType_Console,
}),
},
Inbound: []*core.InboundHandlerConfig{
{
ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(serverPort)}},
Listen: net.NewIPOrDomain(net.LocalHostIP),
}),
ProxySettings: serial.ToTypedMessage(&inbound.Config{
Clients: []*protocol.User{
{
Account: serial.ToTypedMessage(&vless.Account{
Id: userID.String(),
Seed: "1",
}),
},
},
}),
},
},
Outbound: []*core.OutboundHandlerConfig{
{
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
},
},
}
clientPort := tcp.PickPort()
clientConfig := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&log.Config{
ErrorLogLevel: clog.Severity_Debug,
ErrorLogType: log.LogType_Console,
}),
},
Inbound: []*core.InboundHandlerConfig{
{
ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(clientPort)}},
Listen: net.NewIPOrDomain(net.LocalHostIP),
}),
ProxySettings: serial.ToTypedMessage(&dokodemo.Config{
Address: net.NewIPOrDomain(dest.Address),
Port: uint32(dest.Port),
NetworkList: &net.NetworkList{
Network: []net.Network{net.Network_TCP},
},
}),
},
},
Outbound: []*core.OutboundHandlerConfig{
{
ProxySettings: serial.ToTypedMessage(&outbound.Config{
Vnext: []*protocol.ServerEndpoint{
{
Address: net.NewIPOrDomain(net.LocalHostIP),
Port: uint32(serverPort),
User: []*protocol.User{
{
Account: serial.ToTypedMessage(&vless.Account{
Id: userID.String(),
Seed: "1",
}), }),
}, },
}, },
@ -348,6 +262,18 @@ func TestVlessTls(t *testing.T) {
} }
func TestVlessXtlsVision(t *testing.T) { func TestVlessXtlsVision(t *testing.T) {
testVlessXtlsVisionWithSeed(t, "")
}
func TestVlessXtlsVisionWithFixedTrigger(t *testing.T) {
testVlessXtlsVisionWithSeed(t, "1")
}
func TestVlessXtlsVisionWithPingPong(t *testing.T) {
testVlessXtlsVisionWithSeed(t, "pingpong")
}
func testVlessXtlsVisionWithSeed(t *testing.T, seed string) {
tcpServer := tcp.Server{ tcpServer := tcp.Server{
MsgProcessor: xor, MsgProcessor: xor,
} }
@ -385,6 +311,7 @@ func TestVlessXtlsVision(t *testing.T) {
Account: serial.ToTypedMessage(&vless.Account{ Account: serial.ToTypedMessage(&vless.Account{
Id: userID.String(), Id: userID.String(),
Flow: vless.XRV, Flow: vless.XRV,
Seed: seed,
}), }),
}, },
}, },
@ -431,6 +358,7 @@ func TestVlessXtlsVision(t *testing.T) {
Account: serial.ToTypedMessage(&vless.Account{ Account: serial.ToTypedMessage(&vless.Account{
Id: userID.String(), Id: userID.String(),
Flow: vless.XRV, Flow: vless.XRV,
Seed: seed,
}), }),
}, },
}, },
@ -443,134 +371,6 @@ func TestVlessXtlsVision(t *testing.T) {
TransportSettings: []*internet.TransportConfig{ TransportSettings: []*internet.TransportConfig{
{ {
ProtocolName: "tcp", ProtocolName: "tcp",
Settings: serial.ToTypedMessage(&transtcp.Config{}),
},
},
SecurityType: serial.GetMessageType(&tls.Config{}),
SecuritySettings: []*serial.TypedMessage{
serial.ToTypedMessage(&tls.Config{
AllowInsecure: true,
}),
},
},
}),
},
},
}
servers, err := InitializeServerConfigs(serverConfig, clientConfig)
common.Must(err)
defer CloseAllServers(servers)
var errg errgroup.Group
for i := 0; i < 10; i++ {
errg.Go(testTCPConn(clientPort, 1024*1024, time.Second*30))
}
if err := errg.Wait(); err != nil {
t.Error(err)
}
}
func TestVlessXtlsVisionWithSeed(t *testing.T) {
tcpServer := tcp.Server{
MsgProcessor: xor,
}
dest, err := tcpServer.Start()
common.Must(err)
defer tcpServer.Close()
userID := protocol.NewID(uuid.New())
serverPort := tcp.PickPort()
serverConfig := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&log.Config{
ErrorLogLevel: clog.Severity_Debug,
ErrorLogType: log.LogType_Console,
}),
},
Inbound: []*core.InboundHandlerConfig{
{
ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(serverPort)}},
Listen: net.NewIPOrDomain(net.LocalHostIP),
StreamSettings: &internet.StreamConfig{
Protocol: internet.TransportProtocol_TCP,
SecurityType: serial.GetMessageType(&tls.Config{}),
SecuritySettings: []*serial.TypedMessage{
serial.ToTypedMessage(&tls.Config{
Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil))},
}),
},
},
}),
ProxySettings: serial.ToTypedMessage(&inbound.Config{
Clients: []*protocol.User{
{
Account: serial.ToTypedMessage(&vless.Account{
Id: userID.String(),
Flow: vless.XRV,
Seed: "1",
}),
},
},
}),
},
},
Outbound: []*core.OutboundHandlerConfig{
{
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
},
},
}
clientPort := tcp.PickPort()
clientConfig := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&log.Config{
ErrorLogLevel: clog.Severity_Debug,
ErrorLogType: log.LogType_Console,
}),
},
Inbound: []*core.InboundHandlerConfig{
{
ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(clientPort)}},
Listen: net.NewIPOrDomain(net.LocalHostIP),
}),
ProxySettings: serial.ToTypedMessage(&dokodemo.Config{
Address: net.NewIPOrDomain(dest.Address),
Port: uint32(dest.Port),
NetworkList: &net.NetworkList{
Network: []net.Network{net.Network_TCP},
},
}),
},
},
Outbound: []*core.OutboundHandlerConfig{
{
ProxySettings: serial.ToTypedMessage(&outbound.Config{
Vnext: []*protocol.ServerEndpoint{
{
Address: net.NewIPOrDomain(net.LocalHostIP),
Port: uint32(serverPort),
User: []*protocol.User{
{
Account: serial.ToTypedMessage(&vless.Account{
Id: userID.String(),
Flow: vless.XRV,
Seed: "1",
}),
},
},
},
},
}),
SenderSettings: serial.ToTypedMessage(&proxyman.SenderConfig{
StreamSettings: &internet.StreamConfig{
Protocol: internet.TransportProtocol_TCP,
TransportSettings: []*internet.TransportConfig{
{
Protocol: internet.TransportProtocol_TCP,
Settings: serial.ToTypedMessage(&transtcp.Config{}), Settings: serial.ToTypedMessage(&transtcp.Config{}),
}, },
}, },