Xray-core/app/proxyman/outbound/handler_test.go
nobody d20a835016
Fix concurrent map writes error in ohm.Select(). (#2943)
* Add unit test for ohm.tagsCache.

* Fix concurrent map writes in ohm.Select().

---------

Co-authored-by: nobody <nobody@nowhere.mars>
2024-01-16 10:52:01 -05:00

174 lines
4.2 KiB
Go

package outbound_test
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/xtls/xray-core/app/policy"
"github.com/xtls/xray-core/app/proxyman"
. "github.com/xtls/xray-core/app/proxyman/outbound"
"github.com/xtls/xray-core/app/stats"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/serial"
core "github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/outbound"
"github.com/xtls/xray-core/proxy/freedom"
"github.com/xtls/xray-core/transport/internet/stat"
)
func TestInterfaces(t *testing.T) {
_ = (outbound.Handler)(new(Handler))
_ = (outbound.Manager)(new(Manager))
}
const xrayKey core.XrayKey = 1
func TestOutboundWithoutStatCounter(t *testing.T) {
config := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&stats.Config{}),
serial.ToTypedMessage(&policy.Config{
System: &policy.SystemPolicy{
Stats: &policy.SystemPolicy_Stats{
InboundUplink: true,
},
},
}),
},
}
v, _ := core.New(config)
v.AddFeature((outbound.Manager)(new(Manager)))
ctx := context.WithValue(context.Background(), xrayKey, v)
h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
Tag: "tag",
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
})
conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
_, ok := conn.(*stat.CounterConnection)
if ok {
t.Errorf("Expected conn to not be CounterConnection")
}
}
func TestOutboundWithStatCounter(t *testing.T) {
config := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&stats.Config{}),
serial.ToTypedMessage(&policy.Config{
System: &policy.SystemPolicy{
Stats: &policy.SystemPolicy_Stats{
OutboundUplink: true,
OutboundDownlink: true,
},
},
}),
},
}
v, _ := core.New(config)
v.AddFeature((outbound.Manager)(new(Manager)))
ctx := context.WithValue(context.Background(), xrayKey, v)
h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
Tag: "tag",
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
})
conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
_, ok := conn.(*stat.CounterConnection)
if !ok {
t.Errorf("Expected conn to be CounterConnection")
}
}
func TestTagsCache(t *testing.T) {
test_duration := 10 * time.Second
threads_num := 50
delay := 10 * time.Millisecond
tags_prefix := "node"
tags := sync.Map{}
counter := atomic.Uint64{}
ohm, err := New(context.Background(), &proxyman.OutboundConfig{})
if err != nil {
t.Error("failed to create outbound handler manager")
}
config := &core.Config{
App: []*serial.TypedMessage{},
}
v, _ := core.New(config)
v.AddFeature(ohm)
ctx := context.WithValue(context.Background(), xrayKey, v)
stop_add_rm := false
wg_add_rm := sync.WaitGroup{}
addHandlers := func() {
defer wg_add_rm.Done()
for !stop_add_rm {
time.Sleep(delay)
idx := counter.Add(1)
tag := fmt.Sprintf("%s%d", tags_prefix, idx)
cfg := &core.OutboundHandlerConfig{
Tag: tag,
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
}
if h, err := NewHandler(ctx, cfg); err == nil {
if err := ohm.AddHandler(ctx, h); err == nil {
// t.Log("add handler:", tag)
tags.Store(tag, nil)
} else {
t.Error("failed to add handler:", tag)
}
} else {
t.Error("failed to create handler:", tag)
}
}
}
rmHandlers := func() {
defer wg_add_rm.Done()
for !stop_add_rm {
time.Sleep(delay)
tags.Range(func(key interface{}, value interface{}) bool {
if _, ok := tags.LoadAndDelete(key); ok {
// t.Log("remove handler:", key)
ohm.RemoveHandler(ctx, key.(string))
return false
}
return true
})
}
}
selectors := []string{tags_prefix}
wg_get := sync.WaitGroup{}
stop_get := false
getTags := func() {
defer wg_get.Done()
for !stop_get {
time.Sleep(delay)
_ = ohm.Select(selectors)
// t.Logf("get tags: %v", tag)
}
}
for i := 0; i < threads_num; i++ {
wg_add_rm.Add(2)
go rmHandlers()
go addHandlers()
wg_get.Add(1)
go getTags()
}
time.Sleep(test_duration)
stop_add_rm = true
wg_add_rm.Wait()
stop_get = true
wg_get.Wait()
}