From 28b17b529de6b855c1c50b6084030ebd8edda499 Mon Sep 17 00:00:00 2001 From: Zhu Sheng Li Date: Fri, 27 Aug 2021 01:01:10 +0800 Subject: [PATCH] Add concurrency option for outbound observation Add `enableConcurrency` option, false by default. If it's set as `true`, start probing outbounds concurrently in every circle of observation. Wait `probeInterval` between observation circles. --- app/observatory/observer.go | 41 +++++++++++++++++++++++++++++-------- infra/conf/observatory.go | 11 +++++----- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/app/observatory/observer.go b/app/observatory/observer.go index 9ace7bf7..b752dcfa 100644 --- a/app/observatory/observer.go +++ b/app/observatory/observer.go @@ -66,22 +66,45 @@ func (o *Observer) background() { } outbounds := hs.Select(o.config.SubjectSelector) - sort.Strings(outbounds) o.updateStatus(outbounds) + sleepTime := time.Second * 10 + if o.config.ProbeInterval != 0 { + sleepTime = time.Duration(o.config.ProbeInterval) + } + + if !o.config.EnableConcurrency { + sort.Strings(outbounds) + for _, v := range outbounds { + result := o.probe(v) + o.updateStatusForResult(v, &result) + if o.finished.Done() { + return + } + time.Sleep(sleepTime) + } + continue + } + + ch := make(chan struct{}, len(outbounds)) + for _, v := range outbounds { - result := o.probe(v) - o.updateStatusForResult(v, &result) - if o.finished.Done() { + go func(v string) { + result := o.probe(v) + o.updateStatusForResult(v, &result) + ch <- struct{}{} + }(v) + } + + for range outbounds { + select { + case <-ch: + case <-o.finished.Wait(): return } - sleepTime := time.Second * 10 - if o.config.ProbeInterval != 0 { - sleepTime = time.Duration(o.config.ProbeInterval) - } - time.Sleep(sleepTime) } + time.Sleep(sleepTime) } } diff --git a/infra/conf/observatory.go b/infra/conf/observatory.go index 9593e7f2..6dce81ca 100644 --- a/infra/conf/observatory.go +++ b/infra/conf/observatory.go @@ -2,17 +2,18 @@ package conf import ( "github.com/golang/protobuf/proto" - + "github.com/xtls/xray-core/app/observatory" "github.com/xtls/xray-core/infra/conf/cfgcommon/duration" ) type ObservatoryConfig struct { - SubjectSelector []string `json:"subjectSelector"` - ProbeURL string `json:"probeURL"` - ProbeInterval duration.Duration `json:"probeInterval"` + SubjectSelector []string `json:"subjectSelector"` + ProbeURL string `json:"probeURL"` + ProbeInterval duration.Duration `json:"probeInterval"` + EnableConcurrency bool `json:"enableConcurrency"` } func (o *ObservatoryConfig) Build() (proto.Message, error) { - return &observatory.Config{SubjectSelector: o.SubjectSelector, ProbeUrl: o.ProbeURL, ProbeInterval: int64(o.ProbeInterval)}, nil + return &observatory.Config{SubjectSelector: o.SubjectSelector, ProbeUrl: o.ProbeURL, ProbeInterval: int64(o.ProbeInterval), EnableConcurrency: o.EnableConcurrency}, nil }