diff --git a/cmd/seq-ui/main.go b/cmd/seq-ui/main.go index 8d7a543..d661fcd 100644 --- a/cmd/seq-ui/main.go +++ b/cmd/seq-ui/main.go @@ -163,7 +163,7 @@ func initApp(ctx context.Context, cfg config.Config) *api.Registrar { userProfileV1 = userprofile_v1.New(svc, p) dashboardsV1 = dashboards_v1.New(svc, p) - asyncSearchesService = asyncsearches.New(ctx, repo, defaultClient, cfg.Handlers.AsyncSearch.AdminUsers) + asyncSearchesService = asyncsearches.New(ctx, repo, defaultClient, cfg.Handlers.AsyncSearch) } seqApiV1 := seqapi_v1.New(cfg.Handlers.SeqAPI, seqDBClients, inmemWithRedisCache, redisCache, asyncSearchesService, p) diff --git a/docs/en/02-configuration.md b/docs/en/02-configuration.md index 80bd9fa..48b30f6 100644 --- a/docs/en/02-configuration.md +++ b/docs/en/02-configuration.md @@ -506,6 +506,7 @@ handlers: seq_api: error_groups: mass_export: + async_search: ``` ### SeqAPI @@ -812,6 +813,22 @@ Config for `/massexport` API handlers. > The value must be passed in the duration format: `(ms|s|m|h)`. +### Async Search + +Configuration for async search request. + +**`async_search`** *`AsyncSearch`* *`optional`* + +`AsyncSearch` fields: + ++ **`admin_users`** *`[]string`* *`optional`* + + List of users allowed to cancel or delete async requests created by other users. + ++ **`list_query_length_limit`** *`int`* *`default=1000`* + + Maximum length of `request.query` in async searches list responses. Requests exceeding the limit will be truncated to it + ## Tracing The tracing configuration is set through environment variables. diff --git a/docs/ru/02-configuration.md b/docs/ru/02-configuration.md index 3a8ff50..65ecfee 100644 --- a/docs/ru/02-configuration.md +++ b/docs/ru/02-configuration.md @@ -506,6 +506,7 @@ handlers: seq_api: error_groups: mass_export: + async_search: ``` ### SeqAPI @@ -812,6 +813,22 @@ handlers: > Значение должно быть передано в `duration`-формате: `<число>(ms|s|m|h)`. +### Async Search + +**`async_search`** *`AsyncSearch`* *`optional`* + +Настройка функциональности отложенных поисковых запросов. + +Поля `AsyncSearch`: + ++ **`admin_users`** *`[]string`* *`optional`* + + Список пользователей, которые могут отменять или удалять отложенные поиски других пользователей. + ++ **`list_query_length_limit`** *`int`* *`default=1000`* + + Максимальная длина `request.query` в ответе списка отложенных запросов. Запросы, превышающие лимит, будут обрезаны до этого значения. + ## Tracing Конфигурация трейсинга задается переменными окружения. diff --git a/internal/api/seqapi/v1/grpc/test_data.go b/internal/api/seqapi/v1/grpc/test_data.go index 778d702..4b605ea 100644 --- a/internal/api/seqapi/v1/grpc/test_data.go +++ b/internal/api/seqapi/v1/grpc/test_data.go @@ -34,7 +34,7 @@ func initTestAPIWithAsyncSearches(data test.APITestData) *API { seqDBClients := map[string]seqdb.Client{ config.DefaultSeqDBClientID: data.Mocks.SeqDB, } - as := asyncsearches.New(context.Background(), data.Mocks.AsyncSearchesRepo, data.Mocks.SeqDB, []string{}) + as := asyncsearches.New(context.Background(), data.Mocks.AsyncSearchesRepo, data.Mocks.SeqDB, data.AsyncCfg) s := service.New(&repository.Repository{ UserProfiles: data.Mocks.ProfilesRepo, }) diff --git a/internal/api/seqapi/v1/http/get_async_searches_list_test.go b/internal/api/seqapi/v1/http/get_async_searches_list_test.go index f82fd89..1811ae6 100644 --- a/internal/api/seqapi/v1/http/get_async_searches_list_test.go +++ b/internal/api/seqapi/v1/http/get_async_searches_list_test.go @@ -13,6 +13,7 @@ import ( "github.com/ozontech/seq-ui/internal/api/httputil" "github.com/ozontech/seq-ui/internal/api/seqapi/v1/test" + "github.com/ozontech/seq-ui/internal/app/config" "github.com/ozontech/seq-ui/internal/app/types" mock_seqdb "github.com/ozontech/seq-ui/internal/pkg/client/seqdb/mock" mock_repo "github.com/ozontech/seq-ui/internal/pkg/repository/mock" @@ -28,8 +29,9 @@ func TestServeGetAsyncSearchesList(t *testing.T) { mockProfileID1 int64 = 1 mockProfileID2 int64 = 1 errorMsg = "some error" - - mockTime = time.Date(2025, 8, 6, 17, 52, 12, 123, time.UTC) + tooLongQuery = strings.Repeat("message:error and level:3", 41) + TruncatedQuery = strings.Repeat("message:error and level:3", 40) + mockTime = time.Date(2025, 8, 6, 17, 52, 12, 123, time.UTC) ) type mockArgs struct { @@ -47,6 +49,7 @@ func TestServeGetAsyncSearchesList(t *testing.T) { name string reqBody string + cfg config.Handlers wantRespBody string wantStatus int @@ -237,12 +240,59 @@ func TestServeGetAsyncSearchesList(t *testing.T) { reqBody: "invalid-request", wantStatus: http.StatusBadRequest, }, + { + name: "query_too_long", + reqBody: "{}", + mockArgs: &mockArgs{ + proxyReq: &seqapi.GetAsyncSearchesListRequest{}, + proxyResp: &seqapi.GetAsyncSearchesListResponse{ + Searches: []*seqapi.GetAsyncSearchesListResponse_ListItem{ + { + SearchId: mockSearchID1, + Status: seqapi.AsyncSearchStatus_ASYNC_SEARCH_STATUS_DONE, + Request: &seqapi.StartAsyncSearchRequest{ + Retention: durationpb.New(60 * time.Second), + Query: tooLongQuery, + From: timestamppb.New(mockTime.Add(-15 * time.Minute)), + To: timestamppb.New(mockTime), + WithDocs: true, + Size: 100, + }, + StartedAt: timestamppb.New(mockTime.Add(-30 * time.Second)), + ExpiresAt: timestamppb.New(mockTime.Add(30 * time.Second)), + Progress: 1, + DiskUsage: 512, + OwnerName: mockUserName1, + }, + }, + Error: &seqapi.Error{ + Code: seqapi.ErrorCode_ERROR_CODE_PARTIAL_RESPONSE, + Message: "partial response", + }, + }, + repoReq: types.GetAsyncSearchesListRequest{}, + repoResp: []types.AsyncSearchInfo{ + { + SearchID: mockSearchID1, + OwnerID: mockProfileID1, + OwnerName: mockUserName1, + }, + }, + searchIDs: []string{mockSearchID1}, + }, + wantRespBody: `{"searches":[{"search_id":"c9a34cf8-4c66-484e-9cc2-42979d848656","status":"done","request":{"retention":"seconds:60","query":"` + TruncatedQuery + `...","from":"2025-08-06T17:37:12.000000123Z","to":"2025-08-06T17:52:12.000000123Z","with_docs":true,"size":100},"started_at":"2025-08-06T17:51:42.000000123Z","expires_at":"2025-08-06T17:52:42.000000123Z","progress":1,"disk_usage":"512","owner_name":"some_user_1"}],"error":{"code":"ERROR_CODE_PARTIAL_RESPONSE","message":"partial response"}}`, + wantStatus: http.StatusOK, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() - seqData := test.APITestData{} + seqData := test.APITestData{ + AsyncCfg: config.AsyncSearch{ + ListQueryLengthLimit: 1000, + }, + } if tt.mockArgs != nil { ctrl := gomock.NewController(t) diff --git a/internal/api/seqapi/v1/http/test_data.go b/internal/api/seqapi/v1/http/test_data.go index 06a7a97..455b472 100644 --- a/internal/api/seqapi/v1/http/test_data.go +++ b/internal/api/seqapi/v1/http/test_data.go @@ -34,7 +34,7 @@ func initTestAPIWithAsyncSearches(data test.APITestData) *API { seqDBClients := map[string]seqdb.Client{ config.DefaultSeqDBClientID: data.Mocks.SeqDB, } - as := asyncsearches.New(context.Background(), data.Mocks.AsyncSearchesRepo, data.Mocks.SeqDB, []string{}) + as := asyncsearches.New(context.Background(), data.Mocks.AsyncSearchesRepo, data.Mocks.SeqDB, data.AsyncCfg) s := service.New(&repository.Repository{ UserProfiles: data.Mocks.ProfilesRepo, }) diff --git a/internal/api/seqapi/v1/test/data.go b/internal/api/seqapi/v1/test/data.go index 10c67b2..46ea63d 100644 --- a/internal/api/seqapi/v1/test/data.go +++ b/internal/api/seqapi/v1/test/data.go @@ -27,8 +27,9 @@ type Mocks struct { } type APITestData struct { - Cfg config.SeqAPI - Mocks Mocks + Cfg config.SeqAPI + AsyncCfg config.AsyncSearch + Mocks Mocks } func MakeEvent(id string, countData int, t time.Time) *seqapi.Event { diff --git a/internal/app/config/config.go b/internal/app/config/config.go index 954594b..e751e32 100644 --- a/internal/app/config/config.go +++ b/internal/app/config/config.go @@ -30,6 +30,8 @@ const ( minGRPCKeepaliveTime = 10 * time.Second minGRPCKeepaliveTimeout = 1 * time.Second + defaultAsyncSearchListQueryLengthLimit = 1000 + defaultMaxSearchTotalLimit = 1000000 defaultMaxSearchOffsetLimit = 1000000 defaultMaxAggregationsPerRequest = 1 @@ -311,7 +313,8 @@ type ErrorGroups struct { } type AsyncSearch struct { - AdminUsers []string `yaml:"admin_users"` + AdminUsers []string `yaml:"admin_users"` + ListQueryLengthLimit int `yaml:"list_query_length_limit"` } // FromFile parse config from config path. @@ -351,6 +354,9 @@ func FromFile(cfgPath string) (Config, error) { if cfg.Handlers.SeqAPI.MaxSearchOffsetLimit <= 0 { cfg.Handlers.SeqAPI.MaxSearchOffsetLimit = defaultMaxSearchOffsetLimit } + if cfg.Handlers.AsyncSearch.ListQueryLengthLimit <= 0 { + cfg.Handlers.AsyncSearch.ListQueryLengthLimit = defaultAsyncSearchListQueryLengthLimit + } if cfg.Handlers.SeqAPI.EventsCacheTTL <= 0 { cfg.Handlers.SeqAPI.EventsCacheTTL = defaultEventsCacheTTL } diff --git a/internal/pkg/service/async_searches/service.go b/internal/pkg/service/async_searches/service.go index a91102b..0960785 100644 --- a/internal/pkg/service/async_searches/service.go +++ b/internal/pkg/service/async_searches/service.go @@ -5,14 +5,17 @@ import ( "fmt" "slices" "time" + "unicode/utf8" "github.com/cenkalti/backoff/v4" "go.uber.org/zap" + "github.com/ozontech/seq-ui/internal/app/config" "github.com/ozontech/seq-ui/internal/app/types" "github.com/ozontech/seq-ui/internal/pkg/client/seqdb" "github.com/ozontech/seq-ui/internal/pkg/repository" "github.com/ozontech/seq-ui/logger" + "github.com/ozontech/seq-ui/metric" "github.com/ozontech/seq-ui/pkg/seqapi/v1" ) @@ -27,19 +30,19 @@ type Service struct { repo repository.AsyncSearches seqDB seqdb.Client - admin_users []string + cfg config.AsyncSearch } func New( ctx context.Context, repo repository.AsyncSearches, seqDB seqdb.Client, - admins []string, + cfg config.AsyncSearch, ) *Service { s := &Service{ - repo: repo, - seqDB: seqDB, - admin_users: admins, + repo: repo, + seqDB: seqDB, + cfg: cfg, } go s.deleteExpiredAsyncSearches(ctx) @@ -52,6 +55,9 @@ func (s *Service) StartAsyncSearch( ownerID int64, req *seqapi.StartAsyncSearchRequest, ) (*seqapi.StartAsyncSearchResponse, error) { + if utf8.RuneCountInString(req.Query) > s.cfg.ListQueryLengthLimit { + metric.AsyncSearchQueryTooLong.Inc() + } resp, err := s.seqDB.StartAsyncSearch(ctx, req) if err != nil { return nil, fmt.Errorf("failed to start async search: %w", err) @@ -179,6 +185,7 @@ func (s *Service) GetAsyncSearchesList( for _, as := range resp.Searches { as.OwnerName = ownerNameByID[as.SearchId] + as.Request.Query = s.trimQueryToLimit(as.Request.Query, s.cfg.ListQueryLengthLimit) } return resp, nil @@ -190,7 +197,7 @@ func (s *Service) isAdmin(ctx context.Context) bool { return false } - return slices.Index(s.admin_users, userName) >= 0 + return slices.Index(s.cfg.AdminUsers, userName) >= 0 } func (s *Service) deleteExpiredAsyncSearches(ctx context.Context) { @@ -209,3 +216,14 @@ func (s *Service) deleteExpiredAsyncSearches(ctx context.Context) { } } } + +func (s *Service) trimQueryToLimit(query string, limit int) string { + count := 0 + for i := range query { + if count == limit { + return query[:i] + "..." + } + count++ + } + return query +} diff --git a/metric/metric.go b/metric/metric.go index 272a7d1..c28790e 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -17,6 +17,7 @@ const ( repoSubsys = "repository" clickHouseSubsys = "clickhouse" massExportSubsys = "mass_export" + asyncSearchSubsys = "async_search" componentLabel = "component" methodLabel = "method" @@ -209,6 +210,14 @@ var ( Help: "", Buckets: defaultBuckets, }, []string{sessionIDLabel}) + + // async search metrics + AsyncSearchQueryTooLong = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: seqUINS, + Subsystem: asyncSearchSubsys, + Name: "requests_query_too_long_total", + Help: "", + }) ) // HandledIncomingRequest handles metrics for processed incoming request.