From 84eeb56ae45f48b9ab5166abefb2fda40986b8a8 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Sun, 5 May 2024 10:07:40 -0400 Subject: [PATCH] Select alive only node when fallbackTag is given - Apply to random and roundrobin strategy - Require observatory config Co-authored-by: Mark Ma <38940419+mkmark@users.noreply.github.com> --- app/router/balancing.go | 49 ++++++++++++++++++++++++++++++++++- app/router/config.go | 4 +-- app/router/strategy_random.go | 48 +++++++++++++++++++++++++++++++++- infra/conf/router_test.go | 4 ++- 4 files changed, 100 insertions(+), 5 deletions(-) diff --git a/app/router/balancing.go b/app/router/balancing.go index 7eb8c5a8..c2eefe84 100644 --- a/app/router/balancing.go +++ b/app/router/balancing.go @@ -4,6 +4,9 @@ import ( "context" sync "sync" + "github.com/xtls/xray-core/app/observatory" + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/core" "github.com/xtls/xray-core/features/extension" "github.com/xtls/xray-core/features/outbound" ) @@ -17,14 +20,58 @@ type BalancingPrincipleTarget interface { } type RoundRobinStrategy struct { + FallbackTag string + + ctx context.Context + observatory extension.Observatory mu sync.Mutex index int } +func (s *RoundRobinStrategy) InjectContext(ctx context.Context) { + s.ctx = ctx +} + +func (s *RoundRobinStrategy) GetPrincipleTarget(strings []string) []string { + return strings +} + func (s *RoundRobinStrategy) PickOutbound(tags []string) string { + if len(s.FallbackTag) > 0 && s.observatory == nil { + common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error { + s.observatory = observatory + return nil + })) + } + if s.observatory != nil { + observeReport, err := s.observatory.GetObservation(s.ctx) + if err == nil { + aliveTags := make([]string, 0) + if result, ok := observeReport.(*observatory.ObservationResult); ok { + status := result.Status + statusMap := make(map[string]*observatory.OutboundStatus) + for _, outboundStatus := range status { + statusMap[outboundStatus.OutboundTag] = outboundStatus + } + for _, candidate := range tags { + if outboundStatus, found := statusMap[candidate]; found { + if outboundStatus.Alive { + aliveTags = append(aliveTags, candidate) + } + } else { + // unfound candidate is considered alive + aliveTags = append(aliveTags, candidate) + } + } + tags = aliveTags + } + } + } + n := len(tags) if n == 0 { - panic("0 tags") + // goes to fallbackTag + return "" } s.mu.Lock() diff --git a/app/router/config.go b/app/router/config.go index 690064bd..e7c2af67 100644 --- a/app/router/config.go +++ b/app/router/config.go @@ -135,7 +135,7 @@ func (br *BalancingRule) Build(ohm outbound.Manager, dispatcher routing.Dispatch case "roundrobin": return &Balancer{ selectors: br.OutboundSelector, - strategy: &RoundRobinStrategy{}, + strategy: &RoundRobinStrategy{FallbackTag: br.FallbackTag}, fallbackTag: br.FallbackTag, ohm: ohm, }, nil @@ -162,7 +162,7 @@ func (br *BalancingRule) Build(ohm outbound.Manager, dispatcher routing.Dispatch selectors: br.OutboundSelector, ohm: ohm, fallbackTag: br.FallbackTag, - strategy: &RandomStrategy{}, + strategy: &RandomStrategy{FallbackTag: br.FallbackTag}, }, nil default: return nil, newError("unrecognized balancer type") diff --git a/app/router/strategy_random.go b/app/router/strategy_random.go index 0bfe3ab6..2609e347 100644 --- a/app/router/strategy_random.go +++ b/app/router/strategy_random.go @@ -1,17 +1,63 @@ package router import ( + "context" + + "github.com/xtls/xray-core/app/observatory" + "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/dice" + "github.com/xtls/xray-core/core" + "github.com/xtls/xray-core/features/extension" ) // RandomStrategy represents a random balancing strategy -type RandomStrategy struct{} +type RandomStrategy struct{ + FallbackTag string + + ctx context.Context + observatory extension.Observatory +} + +func (s *RandomStrategy) InjectContext(ctx context.Context) { + s.ctx = ctx +} func (s *RandomStrategy) GetPrincipleTarget(strings []string) []string { return strings } func (s *RandomStrategy) PickOutbound(candidates []string) string { + if len(s.FallbackTag) > 0 && s.observatory == nil { + common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error { + s.observatory = observatory + return nil + })) + } + if s.observatory != nil { + observeReport, err := s.observatory.GetObservation(s.ctx) + if err == nil { + aliveTags := make([]string, 0) + if result, ok := observeReport.(*observatory.ObservationResult); ok { + status := result.Status + statusMap := make(map[string]*observatory.OutboundStatus) + for _, outboundStatus := range status { + statusMap[outboundStatus.OutboundTag] = outboundStatus + } + for _, candidate := range candidates { + if outboundStatus, found := statusMap[candidate]; found { + if outboundStatus.Alive { + aliveTags = append(aliveTags, candidate) + } + } else { + // unfound candidate is considered alive + aliveTags = append(aliveTags, candidate) + } + } + candidates = aliveTags + } + } + } + count := len(candidates) if count == 0 { // goes to fallbackTag diff --git a/infra/conf/router_test.go b/infra/conf/router_test.go index 0af1b3e3..de4c8ed2 100644 --- a/infra/conf/router_test.go +++ b/infra/conf/router_test.go @@ -97,7 +97,8 @@ func TestRouterConfig(t *testing.T) { "balancers": [ { "tag": "b1", - "selector": ["test"] + "selector": ["test"], + "fallbackTag": "fall" }, { "tag": "b2", @@ -137,6 +138,7 @@ func TestRouterConfig(t *testing.T) { Tag: "b1", OutboundSelector: []string{"test"}, Strategy: "random", + FallbackTag: "fall", }, { Tag: "b2",