Add proxy Scheduler

This commit is contained in:
yuhan6665 2024-04-26 10:00:56 -04:00
parent 5eba379a53
commit bbdac34db7
2 changed files with 86 additions and 3 deletions

View file

@ -155,7 +155,7 @@ func NewTrafficState(userUUID []byte, flow string) *TrafficState {
return &state return &state
} }
// VisionReader is used to read xtls vision protocol // VisionReader is used to read seed protocol
// Note Vision probably only make sense as the inner most layer of reader, since it need assess traffic state from origin proxy traffic // Note Vision probably only make sense as the inner most layer of reader, since it need assess traffic state from origin proxy traffic
type VisionReader struct { type VisionReader struct {
buf.Reader buf.Reader
@ -208,7 +208,7 @@ func (w *VisionReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
return buffer, err return buffer, err
} }
// VisionWriter is used to write xtls vision protocol // VisionWriter is used to write seed protocol
// Note Vision probably only make sense as the inner most layer of writer, since it need assess traffic state from origin proxy traffic // Note Vision probably only make sense as the inner most layer of writer, since it need assess traffic state from origin proxy traffic
type VisionWriter struct { type VisionWriter struct {
buf.Writer buf.Writer
@ -216,6 +216,7 @@ type VisionWriter struct {
trafficState *TrafficState trafficState *TrafficState
ctx context.Context ctx context.Context
writeOnceUserUUID []byte writeOnceUserUUID []byte
scheduler *Scheduler
} }
func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, context context.Context) *VisionWriter { func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, context context.Context) *VisionWriter {
@ -227,6 +228,7 @@ func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, cont
trafficState: state, trafficState: state,
ctx: context, ctx: context,
writeOnceUserUUID: w, writeOnceUserUUID: w,
scheduler: NewScheduler(writer, addon, state, context),
} }
} }
@ -279,7 +281,14 @@ func (w *VisionWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
if w.trafficState.StartTime.IsZero() { if w.trafficState.StartTime.IsZero() {
w.trafficState.StartTime = time.Now() w.trafficState.StartTime = time.Now()
} }
return w.Writer.WriteMultiBuffer(mb) w.scheduler.Buffer <- mb
if w.addons.Scheduler == nil {
w.scheduler.Trigger <- -1 // send all buffers
}
if len(w.scheduler.Error) > 0 {
return <-w.scheduler.Error
}
return nil
} }
// ReshapeMultiBuffer prepare multi buffer for padding structure (max 21 bytes) // ReshapeMultiBuffer prepare multi buffer for padding structure (max 21 bytes)

74
proxy/scheduler.go Normal file
View file

@ -0,0 +1,74 @@
package proxy
import (
"context"
"crypto/rand"
"math/big"
"sync"
"time"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/session"
)
type Scheduler struct {
Buffer chan buf.MultiBuffer
Trigger chan int
Error chan error
bufferReadLock *sync.Mutex
writer buf.Writer
addons *Addons
trafficState *TrafficState
ctx context.Context
}
func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, context context.Context) *Scheduler {
var s = Scheduler{
Buffer: make(chan buf.MultiBuffer, 100),
Trigger: make(chan int),
Error: make(chan error, 100),
bufferReadLock: new(sync.Mutex),
writer: w,
addons: addon,
trafficState: state,
ctx: context,
}
go s.mainLoop()
if s.addons.Scheduler != nil {
go s.exampleIndependentScheduler()
}
return &s
}
func(s *Scheduler) mainLoop() {
for trigger := range s.Trigger {
go func() { // each trigger has independent delay, trigger does not block
var d = 0 * time.Millisecond
if s.addons.Delay != nil {
l, err := rand.Int(rand.Reader, big.NewInt(int64(s.addons.Delay.MaxMillis - s.addons.Delay.MinMillis)))
if err != nil {
newError("failed to generate delay", trigger).Base(err).WriteToLog(session.ExportIDToError(s.ctx))
}
d = time.Duration(uint32(l.Int64()) + s.addons.Delay.MinMillis)
time.Sleep(d * time.Millisecond)
}
s.bufferReadLock.Lock() // guard against multiple trigger threads
var sending = len(s.Buffer)
if sending > 0 {
newError("Scheduler Trigger for ", sending, " buffer(s) with ", d, " ", trigger).AtDebug().WriteToLog(session.ExportIDToError(s.ctx))
}
for i := 0; i<sending; i++ {
s.Error <- s.writer.WriteMultiBuffer(<-s.Buffer)
}
s.bufferReadLock.Unlock()
}()
}
}
func(s *Scheduler) exampleIndependentScheduler() {
for {
time.Sleep(500 * time.Millisecond)
s.Trigger <- -1 // send all buffers
}
}