From abb8ba8b0ebf94b355e19c72ce6506ecd805ee81 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Tue, 26 Oct 2021 01:00:31 -0400 Subject: [PATCH] Observatory related fixes (#788) * fix:observatory not supported by multi-json * Fix: observatory starts with empty config & fails to close (#957) * Update strategy_leastping.go (#1019) * add custom probe URL support for observatory * add custom probe interval for observer * apply coding style * Fix: observatory log & JSON config(#1211) Co-authored-by: ihotte * Change default probe url from api.v2fly.org to www.google.com * Cherry-pick missing code from branch 'dev-advloadblancer-2' Co-authored-by: Shelikhoo Co-authored-by: Loyalsoldier <10487845+Loyalsoldier@users.noreply.github.com> Co-authored-by: fanyiguan <52657276+fanyiguang@users.noreply.github.com> Co-authored-by: ihotte <3087168217@qq.com> Co-authored-by: ihotte --- app/observatory/command/command.go | 50 ++++ app/observatory/command/command.pb.go | 278 ++++++++++++++++++ app/observatory/command/command.proto | 24 ++ app/observatory/command/command_grpc.pb.go | 101 +++++++ app/observatory/observer.go | 4 +- app/proxyman/outbound/handler.go | 8 +- app/router/command/command_test.go | 2 +- app/router/config.go | 22 +- app/router/router.go | 5 +- app/router/router_test.go | 10 +- app/router/strategy_leastping.go | 1 + infra/conf/api.go | 3 + infra/conf/cfgcommon/duration/duration.go | 33 +++ .../conf/cfgcommon/duration/duration_test.go | 33 +++ infra/conf/observatory.go | 10 +- infra/conf/router.go | 22 +- infra/conf/router_strategy.go | 6 + infra/conf/router_test.go | 1 + infra/conf/xray.go | 4 + main/distro/all/all.go | 6 + 20 files changed, 601 insertions(+), 22 deletions(-) create mode 100644 app/observatory/command/command.go create mode 100644 app/observatory/command/command.pb.go create mode 100644 app/observatory/command/command.proto create mode 100644 app/observatory/command/command_grpc.pb.go create mode 100644 infra/conf/cfgcommon/duration/duration.go create mode 100644 infra/conf/cfgcommon/duration/duration_test.go create mode 100644 infra/conf/router_strategy.go diff --git a/app/observatory/command/command.go b/app/observatory/command/command.go new file mode 100644 index 00000000..f929233f --- /dev/null +++ b/app/observatory/command/command.go @@ -0,0 +1,50 @@ +// +build !confonly + +package command + +import ( + "context" + + "google.golang.org/grpc" + + core "github.com/xtls/xray-core/core" + "github.com/xtls/xray-core/app/observatory" + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/features/extension" +) + +type service struct { + UnimplementedObservatoryServiceServer + v *core.Instance + + observatory extension.Observatory +} + +func (s *service) GetOutboundStatus(ctx context.Context, request *GetOutboundStatusRequest) (*GetOutboundStatusResponse, error) { + resp, err := s.observatory.GetObservation(ctx) + if err != nil { + return nil, err + } + retdata := resp.(*observatory.ObservationResult) + return &GetOutboundStatusResponse{ + Status: retdata, + }, nil +} + +func (s *service) Register(server *grpc.Server) { + RegisterObservatoryServiceServer(server, s) +} + +func init() { + common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, cfg interface{}) (interface{}, error) { + s := core.MustFromContext(ctx) + sv := &service{v: s} + err := s.RequireFeatures(func(Observatory extension.Observatory) { + sv.observatory = Observatory + }) + if err != nil { + return nil, err + } + return sv, nil + })) +} diff --git a/app/observatory/command/command.pb.go b/app/observatory/command/command.pb.go new file mode 100644 index 00000000..18d192fd --- /dev/null +++ b/app/observatory/command/command.pb.go @@ -0,0 +1,278 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.18.0 +// source: app/observatory/command/command.proto + +package command + +import ( + observatory "github.com/xtls/xray-core/app/observatory" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type GetOutboundStatusRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetOutboundStatusRequest) Reset() { + *x = GetOutboundStatusRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_app_observatory_command_command_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetOutboundStatusRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetOutboundStatusRequest) ProtoMessage() {} + +func (x *GetOutboundStatusRequest) ProtoReflect() protoreflect.Message { + mi := &file_app_observatory_command_command_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetOutboundStatusRequest.ProtoReflect.Descriptor instead. +func (*GetOutboundStatusRequest) Descriptor() ([]byte, []int) { + return file_app_observatory_command_command_proto_rawDescGZIP(), []int{0} +} + +type GetOutboundStatusResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Status *observatory.ObservationResult `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` +} + +func (x *GetOutboundStatusResponse) Reset() { + *x = GetOutboundStatusResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_app_observatory_command_command_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetOutboundStatusResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetOutboundStatusResponse) ProtoMessage() {} + +func (x *GetOutboundStatusResponse) ProtoReflect() protoreflect.Message { + mi := &file_app_observatory_command_command_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetOutboundStatusResponse.ProtoReflect.Descriptor instead. +func (*GetOutboundStatusResponse) Descriptor() ([]byte, []int) { + return file_app_observatory_command_command_proto_rawDescGZIP(), []int{1} +} + +func (x *GetOutboundStatusResponse) GetStatus() *observatory.ObservationResult { + if x != nil { + return x.Status + } + return nil +} + +type Config struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Config) Reset() { + *x = Config{} + if protoimpl.UnsafeEnabled { + mi := &file_app_observatory_command_command_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Config) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Config) ProtoMessage() {} + +func (x *Config) ProtoReflect() protoreflect.Message { + mi := &file_app_observatory_command_command_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Config.ProtoReflect.Descriptor instead. +func (*Config) Descriptor() ([]byte, []int) { + return file_app_observatory_command_command_proto_rawDescGZIP(), []int{2} +} + +var File_app_observatory_command_command_proto protoreflect.FileDescriptor + +var file_app_observatory_command_command_proto_rawDesc = []byte{ + 0x0a, 0x25, 0x61, 0x70, 0x70, 0x2f, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, + 0x79, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, + 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x21, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, + 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, + 0x72, 0x79, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x1a, 0x1c, 0x61, 0x70, 0x70, 0x2f, + 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x2f, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1a, 0x0a, 0x18, 0x47, 0x65, 0x74, 0x4f, + 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x22, 0x61, 0x0a, 0x19, 0x47, 0x65, 0x74, 0x4f, 0x75, 0x74, 0x62, 0x6f, + 0x75, 0x6e, 0x64, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x44, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x2c, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, + 0x70, 0x2e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x4f, 0x62, + 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, + 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x08, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x32, 0xa7, 0x01, 0x0a, 0x12, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, + 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x90, 0x01, 0x0a, 0x11, 0x47, 0x65, 0x74, + 0x4f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x3b, + 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6f, + 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, + 0x6e, 0x64, 0x2e, 0x47, 0x65, 0x74, 0x4f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x3c, 0x2e, 0x78, 0x72, + 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6f, 0x62, 0x73, 0x65, + 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, + 0x47, 0x65, 0x74, 0x4f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x80, 0x01, 0x0a, 0x25, + 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, + 0x70, 0x2e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x63, 0x6f, + 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x50, 0x01, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, + 0x72, 0x65, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, + 0x72, 0x79, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0xaa, 0x02, 0x21, 0x58, 0x72, 0x61, + 0x79, 0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x4f, 0x62, 0x73, 0x65, 0x72, + 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_app_observatory_command_command_proto_rawDescOnce sync.Once + file_app_observatory_command_command_proto_rawDescData = file_app_observatory_command_command_proto_rawDesc +) + +func file_app_observatory_command_command_proto_rawDescGZIP() []byte { + file_app_observatory_command_command_proto_rawDescOnce.Do(func() { + file_app_observatory_command_command_proto_rawDescData = protoimpl.X.CompressGZIP(file_app_observatory_command_command_proto_rawDescData) + }) + return file_app_observatory_command_command_proto_rawDescData +} + +var file_app_observatory_command_command_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_app_observatory_command_command_proto_goTypes = []interface{}{ + (*GetOutboundStatusRequest)(nil), // 0: xray.core.app.observatory.command.GetOutboundStatusRequest + (*GetOutboundStatusResponse)(nil), // 1: xray.core.app.observatory.command.GetOutboundStatusResponse + (*Config)(nil), // 2: xray.core.app.observatory.command.Config + (*observatory.ObservationResult)(nil), // 3: xray.core.app.observatory.ObservationResult +} +var file_app_observatory_command_command_proto_depIdxs = []int32{ + 3, // 0: xray.core.app.observatory.command.GetOutboundStatusResponse.status:type_name -> xray.core.app.observatory.ObservationResult + 0, // 1: xray.core.app.observatory.command.ObservatoryService.GetOutboundStatus:input_type -> xray.core.app.observatory.command.GetOutboundStatusRequest + 1, // 2: xray.core.app.observatory.command.ObservatoryService.GetOutboundStatus:output_type -> xray.core.app.observatory.command.GetOutboundStatusResponse + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_app_observatory_command_command_proto_init() } +func file_app_observatory_command_command_proto_init() { + if File_app_observatory_command_command_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_app_observatory_command_command_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetOutboundStatusRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_observatory_command_command_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetOutboundStatusResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_observatory_command_command_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Config); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_app_observatory_command_command_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_app_observatory_command_command_proto_goTypes, + DependencyIndexes: file_app_observatory_command_command_proto_depIdxs, + MessageInfos: file_app_observatory_command_command_proto_msgTypes, + }.Build() + File_app_observatory_command_command_proto = out.File + file_app_observatory_command_command_proto_rawDesc = nil + file_app_observatory_command_command_proto_goTypes = nil + file_app_observatory_command_command_proto_depIdxs = nil +} diff --git a/app/observatory/command/command.proto b/app/observatory/command/command.proto new file mode 100644 index 00000000..4948809a --- /dev/null +++ b/app/observatory/command/command.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +package xray.core.app.observatory.command; +option csharp_namespace = "Xray.Core.App.Observatory.Command"; +option go_package = "github.com/xtls/xray-core/app/observatory/command"; +option java_package = "com.xray.core.app.observatory.command"; +option java_multiple_files = true; + +import "app/observatory/config.proto"; + +message GetOutboundStatusRequest { +} + +message GetOutboundStatusResponse { + xray.core.app.observatory.ObservationResult status = 1; +} + +service ObservatoryService { + rpc GetOutboundStatus(GetOutboundStatusRequest) + returns (GetOutboundStatusResponse) {} +} + + +message Config {} \ No newline at end of file diff --git a/app/observatory/command/command_grpc.pb.go b/app/observatory/command/command_grpc.pb.go new file mode 100644 index 00000000..d66a0484 --- /dev/null +++ b/app/observatory/command/command_grpc.pb.go @@ -0,0 +1,101 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package command + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// ObservatoryServiceClient is the client API for ObservatoryService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type ObservatoryServiceClient interface { + GetOutboundStatus(ctx context.Context, in *GetOutboundStatusRequest, opts ...grpc.CallOption) (*GetOutboundStatusResponse, error) +} + +type observatoryServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewObservatoryServiceClient(cc grpc.ClientConnInterface) ObservatoryServiceClient { + return &observatoryServiceClient{cc} +} + +func (c *observatoryServiceClient) GetOutboundStatus(ctx context.Context, in *GetOutboundStatusRequest, opts ...grpc.CallOption) (*GetOutboundStatusResponse, error) { + out := new(GetOutboundStatusResponse) + err := c.cc.Invoke(ctx, "/xray.core.app.observatory.command.ObservatoryService/GetOutboundStatus", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ObservatoryServiceServer is the server API for ObservatoryService service. +// All implementations must embed UnimplementedObservatoryServiceServer +// for forward compatibility +type ObservatoryServiceServer interface { + GetOutboundStatus(context.Context, *GetOutboundStatusRequest) (*GetOutboundStatusResponse, error) + mustEmbedUnimplementedObservatoryServiceServer() +} + +// UnimplementedObservatoryServiceServer must be embedded to have forward compatible implementations. +type UnimplementedObservatoryServiceServer struct { +} + +func (UnimplementedObservatoryServiceServer) GetOutboundStatus(context.Context, *GetOutboundStatusRequest) (*GetOutboundStatusResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetOutboundStatus not implemented") +} +func (UnimplementedObservatoryServiceServer) mustEmbedUnimplementedObservatoryServiceServer() {} + +// UnsafeObservatoryServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ObservatoryServiceServer will +// result in compilation errors. +type UnsafeObservatoryServiceServer interface { + mustEmbedUnimplementedObservatoryServiceServer() +} + +func RegisterObservatoryServiceServer(s grpc.ServiceRegistrar, srv ObservatoryServiceServer) { + s.RegisterService(&ObservatoryService_ServiceDesc, srv) +} + +func _ObservatoryService_GetOutboundStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetOutboundStatusRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ObservatoryServiceServer).GetOutboundStatus(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/xray.core.app.observatory.command.ObservatoryService/GetOutboundStatus", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ObservatoryServiceServer).GetOutboundStatus(ctx, req.(*GetOutboundStatusRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// ObservatoryService_ServiceDesc is the grpc.ServiceDesc for ObservatoryService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var ObservatoryService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "xray.core.app.observatory.command.ObservatoryService", + HandlerType: (*ObservatoryServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetOutboundStatus", + Handler: _ObservatoryService_GetOutboundStatus_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "app/observatory/command/command.proto", +} diff --git a/app/observatory/observer.go b/app/observatory/observer.go index 618ecccb..9ace7bf7 100644 --- a/app/observatory/observer.go +++ b/app/observatory/observer.go @@ -102,7 +102,7 @@ func (o *Observer) probe(outbound string) ProbeResult { DialContext: func(ctx context.Context, network string, addr string) (net.Conn, error) { var connection net.Conn taskErr := task.Run(ctx, func() error { - // MUST use V2Fly's built in context system + // MUST use Xray's built in context system dest, err := v2net.ParseDestination(network + ":" + addr) if err != nil { return newError("cannot understand address").Base(err) @@ -133,7 +133,7 @@ func (o *Observer) probe(outbound string) ProbeResult { var GETTime time.Duration err := task.Run(o.ctx, func() error { startTime := time.Now() - probeURL := "https://api.v2fly.org/checkConnection.svgz" + probeURL := "https://www.google.com/generate_204" if o.config.ProbeUrl != "" { probeURL = o.config.ProbeUrl } diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index d5cc59f1..5f98bb22 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -136,13 +136,17 @@ func (h *Handler) Tag() string { func (h *Handler) Dispatch(ctx context.Context, link *transport.Link) { if h.mux != nil && (h.mux.Enabled || session.MuxPreferedFromContext(ctx)) { if err := h.mux.Dispatch(ctx, link); err != nil { - newError("failed to process mux outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx)) + err := newError("failed to process mux outbound traffic").Base(err) + session.SubmitOutboundErrorToOriginator(ctx, err) + err.WriteToLog(session.ExportIDToError(ctx)) common.Interrupt(link.Writer) } } else { if err := h.proxy.Process(ctx, link, h); err != nil { // Ensure outbound ray is properly closed. - newError("failed to process outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx)) + err := newError("failed to process outbound traffic").Base(err) + session.SubmitOutboundErrorToOriginator(ctx, err) + err.WriteToLog(session.ExportIDToError(ctx)) common.Interrupt(link.Writer) } else { common.Must(common.Close(link.Writer)) diff --git a/app/router/command/command_test.go b/app/router/command/command_test.go index a7c97f92..2322b8a3 100644 --- a/app/router/command/command_test.go +++ b/app/router/command/command_test.go @@ -284,7 +284,7 @@ func TestSerivceTestRoute(t *testing.T) { r := new(router.Router) mockCtl := gomock.NewController(t) defer mockCtl.Finish() - common.Must(r.Init(&router.Config{ + common.Must(r.Init(context.TODO(), &router.Config{ Rule: []*router.RoutingRule{ { InboundTag: []string{"in"}, diff --git a/app/router/config.go b/app/router/config.go index 780ff10e..9bb4bc46 100644 --- a/app/router/config.go +++ b/app/router/config.go @@ -158,9 +158,21 @@ func (rr *RoutingRule) BuildCondition() (Condition, error) { } func (br *BalancingRule) Build(ohm outbound.Manager) (*Balancer, error) { - return &Balancer{ - selectors: br.OutboundSelector, - strategy: &RandomStrategy{}, - ohm: ohm, - }, nil + switch br.Strategy { + case "leastPing": + return &Balancer{ + selectors: br.OutboundSelector, + strategy: &LeastPingStrategy{}, + ohm: ohm, + }, nil + case "random": + fallthrough + default: + return &Balancer{ + selectors: br.OutboundSelector, + strategy: &RandomStrategy{}, + ohm: ohm, + }, nil + + } } diff --git a/app/router/router.go b/app/router/router.go index 2a2482a6..99b42c49 100644 --- a/app/router/router.go +++ b/app/router/router.go @@ -29,7 +29,7 @@ type Route struct { } // Init initializes the Router. -func (r *Router) Init(config *Config, d dns.Client, ohm outbound.Manager) error { +func (r *Router) Init(ctx context.Context, config *Config, d dns.Client, ohm outbound.Manager) error { r.domainStrategy = config.DomainStrategy r.dns = d @@ -39,6 +39,7 @@ func (r *Router) Init(config *Config, d dns.Client, ohm outbound.Manager) error if err != nil { return err } + balancer.InjectContext(ctx) r.balancers[rule.Tag] = balancer } @@ -140,7 +141,7 @@ func init() { common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { r := new(Router) if err := core.RequireFeatures(ctx, func(d dns.Client, ohm outbound.Manager) error { - return r.Init(config.(*Config), d, ohm) + return r.Init(ctx, config.(*Config), d, ohm) }); err != nil { return nil, err } diff --git a/app/router/router_test.go b/app/router/router_test.go index 93b5fb19..a4f973e5 100644 --- a/app/router/router_test.go +++ b/app/router/router_test.go @@ -42,7 +42,7 @@ func TestSimpleRouter(t *testing.T) { mockHs := mocks.NewOutboundHandlerSelector(mockCtl) r := new(Router) - common.Must(r.Init(config, mockDNS, &mockOutboundManager{ + common.Must(r.Init(context.TODO(), config, mockDNS, &mockOutboundManager{ Manager: mockOhm, HandlerSelector: mockHs, })) @@ -83,7 +83,7 @@ func TestSimpleBalancer(t *testing.T) { mockHs.EXPECT().Select(gomock.Eq([]string{"test-"})).Return([]string{"test"}) r := new(Router) - common.Must(r.Init(config, mockDNS, &mockOutboundManager{ + common.Must(r.Init(context.TODO(), config, mockDNS, &mockOutboundManager{ Manager: mockOhm, HandlerSelector: mockHs, })) @@ -125,7 +125,7 @@ func TestIPOnDemand(t *testing.T) { }).Return([]net.IP{{192, 168, 0, 1}}, nil).AnyTimes() r := new(Router) - common.Must(r.Init(config, mockDNS, nil)) + common.Must(r.Init(context.TODO(), config, mockDNS, nil)) ctx := session.ContextWithOutbound(context.Background(), &session.Outbound{Target: net.TCPDestination(net.DomainAddress("example.com"), 80)}) route, err := r.PickRoute(routing_session.AsRoutingContext(ctx)) @@ -164,7 +164,7 @@ func TestIPIfNonMatchDomain(t *testing.T) { }).Return([]net.IP{{192, 168, 0, 1}}, nil).AnyTimes() r := new(Router) - common.Must(r.Init(config, mockDNS, nil)) + common.Must(r.Init(context.TODO(), config, mockDNS, nil)) ctx := session.ContextWithOutbound(context.Background(), &session.Outbound{Target: net.TCPDestination(net.DomainAddress("example.com"), 80)}) route, err := r.PickRoute(routing_session.AsRoutingContext(ctx)) @@ -198,7 +198,7 @@ func TestIPIfNonMatchIP(t *testing.T) { mockDNS := mocks.NewDNSClient(mockCtl) r := new(Router) - common.Must(r.Init(config, mockDNS, nil)) + common.Must(r.Init(context.TODO(), config, mockDNS, nil)) ctx := session.ContextWithOutbound(context.Background(), &session.Outbound{Target: net.TCPDestination(net.LocalHostIP, 80)}) route, err := r.PickRoute(routing_session.AsRoutingContext(ctx)) diff --git a/app/router/strategy_leastping.go b/app/router/strategy_leastping.go index e970e7f8..ddb5116a 100644 --- a/app/router/strategy_leastping.go +++ b/app/router/strategy_leastping.go @@ -36,6 +36,7 @@ func (l *LeastPingStrategy) PickOutbound(strings []string) string { for _, v := range status { if outboundsList.contains(v.OutboundTag) && v.Alive && v.Delay < leastPing { selectedOutboundName = v.OutboundTag + leastPing = v.Delay } } return selectedOutboundName diff --git a/infra/conf/api.go b/infra/conf/api.go index 02bc77dc..879fc328 100644 --- a/infra/conf/api.go +++ b/infra/conf/api.go @@ -5,6 +5,7 @@ import ( "github.com/xtls/xray-core/app/commander" loggerservice "github.com/xtls/xray-core/app/log/command" + observatoryservice "github.com/xtls/xray-core/app/observatory/command" handlerservice "github.com/xtls/xray-core/app/proxyman/command" statsservice "github.com/xtls/xray-core/app/stats/command" "github.com/xtls/xray-core/common/serial" @@ -31,6 +32,8 @@ func (c *APIConfig) Build() (*commander.Config, error) { services = append(services, serial.ToTypedMessage(&loggerservice.Config{})) case "statsservice": services = append(services, serial.ToTypedMessage(&statsservice.Config{})) + case "observatoryservice": + services = append(services, serial.ToTypedMessage(&observatoryservice.Config{})) } } diff --git a/infra/conf/cfgcommon/duration/duration.go b/infra/conf/cfgcommon/duration/duration.go new file mode 100644 index 00000000..aed8e613 --- /dev/null +++ b/infra/conf/cfgcommon/duration/duration.go @@ -0,0 +1,33 @@ +package duration + +import ( + "encoding/json" + "fmt" + "time" +) + +type Duration int64 + +func (d *Duration) MarshalJSON() ([]byte, error) { + dr := time.Duration(*d) + return json.Marshal(dr.String()) +} + +func (d *Duration) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case string: + var err error + dr, err := time.ParseDuration(value) + if err != nil { + return err + } + *d = Duration(dr) + return nil + default: + return fmt.Errorf("invalid duration: %v", v) + } +} diff --git a/infra/conf/cfgcommon/duration/duration_test.go b/infra/conf/cfgcommon/duration/duration_test.go new file mode 100644 index 00000000..d504531f --- /dev/null +++ b/infra/conf/cfgcommon/duration/duration_test.go @@ -0,0 +1,33 @@ +package duration_test + +import ( + "encoding/json" + "testing" + "time" + + "github.com/xtls/xray-core/infra/conf/cfgcommon/duration" +) + +type testWithDuration struct { + Duration duration.Duration +} + +func TestDurationJSON(t *testing.T) { + expected := &testWithDuration{ + Duration: duration.Duration(time.Hour), + } + data, err := json.Marshal(expected) + if err != nil { + t.Error(err) + return + } + actual := &testWithDuration{} + err = json.Unmarshal(data, &actual) + if err != nil { + t.Error(err) + return + } + if actual.Duration != expected.Duration { + t.Errorf("expected: %s, actual: %s", time.Duration(expected.Duration), time.Duration(actual.Duration)) + } +} diff --git a/infra/conf/observatory.go b/infra/conf/observatory.go index 70624808..9593e7f2 100644 --- a/infra/conf/observatory.go +++ b/infra/conf/observatory.go @@ -2,13 +2,17 @@ package conf import ( "github.com/golang/protobuf/proto" + "github.com/xtls/xray-core/app/observatory" + "github.com/xtls/xray-core/infra/conf/cfgcommon/duration" ) type ObservatoryConfig struct { - SubjectSelector []string `json:"subjectSelector"` + SubjectSelector []string `json:"subjectSelector"` + ProbeURL string `json:"probeURL"` + ProbeInterval duration.Duration `json:"probeInterval"` } -func (o ObservatoryConfig) Build() (proto.Message, error) { - return &observatory.Config{SubjectSelector: o.SubjectSelector}, nil +func (o *ObservatoryConfig) Build() (proto.Message, error) { + return &observatory.Config{SubjectSelector: o.SubjectSelector, ProbeUrl: o.ProbeURL, ProbeInterval: int64(o.ProbeInterval)}, nil } diff --git a/infra/conf/router.go b/infra/conf/router.go index 8b60a9e0..013a8882 100644 --- a/infra/conf/router.go +++ b/infra/conf/router.go @@ -18,9 +18,16 @@ type RouterRulesConfig struct { DomainStrategy string `json:"domainStrategy"` } +// StrategyConfig represents a strategy config +type StrategyConfig struct { + Type string `json:"type"` + Settings *json.RawMessage `json:"settings"` +} + type BalancingRule struct { - Tag string `json:"tag"` - Selectors StringList `json:"selector"` + Tag string `json:"tag"` + Selectors StringList `json:"selector"` + Strategy StrategyConfig `json:"strategy"` } func (r *BalancingRule) Build() (*router.BalancingRule, error) { @@ -31,9 +38,20 @@ func (r *BalancingRule) Build() (*router.BalancingRule, error) { return nil, newError("empty selector list") } + var strategy string + switch strings.ToLower(r.Strategy.Type) { + case strategyRandom, "": + strategy = strategyRandom + case strategyLeastPing: + strategy = "leastPing" + default: + return nil, newError("unknown balancing strategy: " + r.Strategy.Type) + } + return &router.BalancingRule{ Tag: r.Tag, OutboundSelector: []string(r.Selectors), + Strategy: strategy, }, nil } diff --git a/infra/conf/router_strategy.go b/infra/conf/router_strategy.go new file mode 100644 index 00000000..b8536330 --- /dev/null +++ b/infra/conf/router_strategy.go @@ -0,0 +1,6 @@ +package conf + +const ( + strategyRandom string = "random" + strategyLeastPing string = "leastping" +) diff --git a/infra/conf/router_test.go b/infra/conf/router_test.go index 2c828f57..4a26c3e0 100644 --- a/infra/conf/router_test.go +++ b/infra/conf/router_test.go @@ -107,6 +107,7 @@ func TestRouterConfig(t *testing.T) { { Tag: "b1", OutboundSelector: []string{"test"}, + Strategy: "random", }, }, Rule: []*router.RoutingRule{ diff --git a/infra/conf/xray.go b/infra/conf/xray.go index eaed946d..20125a14 100644 --- a/infra/conf/xray.go +++ b/infra/conf/xray.go @@ -464,6 +464,10 @@ func (c *Config) Override(o *Config, fn string) { c.FakeDNS = o.FakeDNS } + if o.Observatory != nil { + c.Observatory = o.Observatory + } + // deprecated attrs... keep them for now if o.InboundConfig != nil { c.InboundConfig = o.InboundConfig diff --git a/main/distro/all/all.go b/main/distro/all/all.go index 3653329a..cc1dcacf 100644 --- a/main/distro/all/all.go +++ b/main/distro/all/all.go @@ -14,6 +14,9 @@ import ( _ "github.com/xtls/xray-core/app/proxyman/command" _ "github.com/xtls/xray-core/app/stats/command" + // Developer preview services + _ "github.com/xtls/xray-core/app/observatory/command" + // Other optional features. _ "github.com/xtls/xray-core/app/dns" _ "github.com/xtls/xray-core/app/dns/fakedns" @@ -26,6 +29,9 @@ import ( // Fix dependency cycle caused by core import in internet package _ "github.com/xtls/xray-core/transport/internet/tagged/taggedimpl" + // Developer preview features + _ "github.com/xtls/xray-core/app/observatory" + // Inbound and outbound proxies. _ "github.com/xtls/xray-core/proxy/blackhole" _ "github.com/xtls/xray-core/proxy/dns"