Xray-core/app/reverse/portal.go
yuhan6665 079d0bd8a9
Refactor log (#3446)
* Refactor log

* Add new log methods

* Fix logger test

* Change all logging code

* Clean up pathObj

* Rebase to latest main

* Remove invoking method name after the dot
2024-06-29 14:32:57 -04:00

270 lines
5.5 KiB
Go

package reverse
import (
"context"
"sync"
"time"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/mux"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/task"
"github.com/xtls/xray-core/features/outbound"
"github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/pipe"
"google.golang.org/protobuf/proto"
)
type Portal struct {
ohm outbound.Manager
tag string
domain string
picker *StaticMuxPicker
client *mux.ClientManager
}
func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) {
if config.Tag == "" {
return nil, errors.New("portal tag is empty")
}
if config.Domain == "" {
return nil, errors.New("portal domain is empty")
}
picker, err := NewStaticMuxPicker()
if err != nil {
return nil, err
}
return &Portal{
ohm: ohm,
tag: config.Tag,
domain: config.Domain,
picker: picker,
client: &mux.ClientManager{
Picker: picker,
},
}, nil
}
func (p *Portal) Start() error {
return p.ohm.AddHandler(context.Background(), &Outbound{
portal: p,
tag: p.tag,
})
}
func (p *Portal) Close() error {
return p.ohm.RemoveHandler(context.Background(), p.tag)
}
func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) error {
outbounds := session.OutboundsFromContext(ctx)
ob := outbounds[len(outbounds)-1]
if ob == nil {
return errors.New("outbound metadata not found").AtError()
}
if isDomain(ob.Target, p.domain) {
muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
if err != nil {
return errors.New("failed to create mux client worker").Base(err).AtWarning()
}
worker, err := NewPortalWorker(muxClient)
if err != nil {
return errors.New("failed to create portal worker").Base(err)
}
p.picker.AddWorker(worker)
return nil
}
return p.client.Dispatch(ctx, link)
}
type Outbound struct {
portal *Portal
tag string
}
func (o *Outbound) Tag() string {
return o.tag
}
func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
if err := o.portal.HandleConnection(ctx, link); err != nil {
errors.LogInfoInner(ctx, err, "failed to process reverse connection")
common.Interrupt(link.Writer)
}
}
func (o *Outbound) Start() error {
return nil
}
func (o *Outbound) Close() error {
return nil
}
type StaticMuxPicker struct {
access sync.Mutex
workers []*PortalWorker
cTask *task.Periodic
}
func NewStaticMuxPicker() (*StaticMuxPicker, error) {
p := &StaticMuxPicker{}
p.cTask = &task.Periodic{
Execute: p.cleanup,
Interval: time.Second * 30,
}
p.cTask.Start()
return p, nil
}
func (p *StaticMuxPicker) cleanup() error {
p.access.Lock()
defer p.access.Unlock()
var activeWorkers []*PortalWorker
for _, w := range p.workers {
if !w.Closed() {
activeWorkers = append(activeWorkers, w)
}
}
if len(activeWorkers) != len(p.workers) {
p.workers = activeWorkers
}
return nil
}
func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
p.access.Lock()
defer p.access.Unlock()
if len(p.workers) == 0 {
return nil, errors.New("empty worker list")
}
var minIdx int = -1
var minConn uint32 = 9999
for i, w := range p.workers {
if w.draining {
continue
}
if w.client.Closed() {
continue
}
if w.client.ActiveConnections() < minConn {
minConn = w.client.ActiveConnections()
minIdx = i
}
}
if minIdx == -1 {
for i, w := range p.workers {
if w.IsFull() {
continue
}
if w.client.ActiveConnections() < minConn {
minConn = w.client.ActiveConnections()
minIdx = i
}
}
}
if minIdx != -1 {
return p.workers[minIdx].client, nil
}
return nil, errors.New("no mux client worker available")
}
func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) {
p.access.Lock()
defer p.access.Unlock()
p.workers = append(p.workers, worker)
}
type PortalWorker struct {
client *mux.ClientWorker
control *task.Periodic
writer buf.Writer
reader buf.Reader
draining bool
}
func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
uplinkReader, uplinkWriter := pipe.New(opt...)
downlinkReader, downlinkWriter := pipe.New(opt...)
ctx := context.Background()
outbounds := []*session.Outbound{{
Target: net.UDPDestination(net.DomainAddress(internalDomain), 0),
}}
ctx = session.ContextWithOutbounds(ctx, outbounds)
f := client.Dispatch(ctx, &transport.Link{
Reader: uplinkReader,
Writer: downlinkWriter,
})
if !f {
return nil, errors.New("unable to dispatch control connection")
}
w := &PortalWorker{
client: client,
reader: downlinkReader,
writer: uplinkWriter,
}
w.control = &task.Periodic{
Execute: w.heartbeat,
Interval: time.Second * 2,
}
w.control.Start()
return w, nil
}
func (w *PortalWorker) heartbeat() error {
if w.client.Closed() {
return errors.New("client worker stopped")
}
if w.draining || w.writer == nil {
return errors.New("already disposed")
}
msg := &Control{}
msg.FillInRandom()
if w.client.TotalConnections() > 256 {
w.draining = true
msg.State = Control_DRAIN
defer func() {
common.Close(w.writer)
common.Interrupt(w.reader)
w.writer = nil
}()
}
b, err := proto.Marshal(msg)
common.Must(err)
mb := buf.MergeBytes(nil, b)
return w.writer.WriteMultiBuffer(mb)
}
func (w *PortalWorker) IsFull() bool {
return w.client.IsFull()
}
func (w *PortalWorker) Closed() bool {
return w.client.Closed()
}