Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/seq-ui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func initApp(ctx context.Context, cfg config.Config) *api.Registrar {
userProfileV1 = userprofile_v1.New(svc, p)
dashboardsV1 = dashboards_v1.New(svc, p)

asyncSearchesService = asyncsearches.New(ctx, repo, defaultClient, cfg.Handlers.AsyncSearch.AdminUsers)
asyncSearchesService = asyncsearches.New(ctx, repo, defaultClient, cfg.Handlers.AsyncSearch)
}

seqApiV1 := seqapi_v1.New(cfg.Handlers.SeqAPI, seqDBClients, inmemWithRedisCache, redisCache, asyncSearchesService, p)
Expand Down
17 changes: 17 additions & 0 deletions docs/en/02-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ handlers:
seq_api:
error_groups:
mass_export:
async_search:
```

### SeqAPI
Expand Down Expand Up @@ -812,6 +813,22 @@ Config for `/massexport` API handlers.

> The value must be passed in the duration format: `<number>(ms|s|m|h)`.

### Async Search

Configuration for async search request.

**`async_search`** *`AsyncSearch`* *`optional`*

`AsyncSearch` fields:

+ **`admin_users`** *`[]string`* *`optional`*

List of users allowed to cancel or delete async requests created by other users.

+ **`list_query_length_limit`** *`int`* *`default=1000`*

Maximum length of `request.query` in async searches list responses. Requests exceeding the limit will be truncated to it

## Tracing

The tracing configuration is set through environment variables.
Expand Down
17 changes: 17 additions & 0 deletions docs/ru/02-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ handlers:
seq_api:
error_groups:
mass_export:
async_search:
```

### SeqAPI
Expand Down Expand Up @@ -812,6 +813,22 @@ handlers:

> Значение должно быть передано в `duration`-формате: `<число>(ms|s|m|h)`.

### Async Search

**`async_search`** *`AsyncSearch`* *`optional`*

Настройка функциональности отложенных поисковых запросов.

Поля `AsyncSearch`:

+ **`admin_users`** *`[]string`* *`optional`*

Список пользователей, которые могут отменять или удалять отложенные поиски других пользователей.

+ **`list_query_length_limit`** *`int`* *`default=1000`*

Максимальная длина `request.query` в ответе списка отложенных запросов. Запросы, превышающие лимит, будут обрезаны до этого значения.

## Tracing

Конфигурация трейсинга задается переменными окружения.
Expand Down
2 changes: 1 addition & 1 deletion internal/api/seqapi/v1/grpc/test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func initTestAPIWithAsyncSearches(data test.APITestData) *API {
seqDBClients := map[string]seqdb.Client{
config.DefaultSeqDBClientID: data.Mocks.SeqDB,
}
as := asyncsearches.New(context.Background(), data.Mocks.AsyncSearchesRepo, data.Mocks.SeqDB, []string{})
as := asyncsearches.New(context.Background(), data.Mocks.AsyncSearchesRepo, data.Mocks.SeqDB, data.AsyncCfg)
s := service.New(&repository.Repository{
UserProfiles: data.Mocks.ProfilesRepo,
})
Expand Down
56 changes: 53 additions & 3 deletions internal/api/seqapi/v1/http/get_async_searches_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/ozontech/seq-ui/internal/api/httputil"
"github.com/ozontech/seq-ui/internal/api/seqapi/v1/test"
"github.com/ozontech/seq-ui/internal/app/config"
"github.com/ozontech/seq-ui/internal/app/types"
mock_seqdb "github.com/ozontech/seq-ui/internal/pkg/client/seqdb/mock"
mock_repo "github.com/ozontech/seq-ui/internal/pkg/repository/mock"
Expand All @@ -28,8 +29,9 @@ func TestServeGetAsyncSearchesList(t *testing.T) {
mockProfileID1 int64 = 1
mockProfileID2 int64 = 1
errorMsg = "some error"

mockTime = time.Date(2025, 8, 6, 17, 52, 12, 123, time.UTC)
tooLongQuery = strings.Repeat("message:error and level:3", 41)
TruncatedQuery = strings.Repeat("message:error and level:3", 40)
mockTime = time.Date(2025, 8, 6, 17, 52, 12, 123, time.UTC)
)

type mockArgs struct {
Expand All @@ -47,6 +49,7 @@ func TestServeGetAsyncSearchesList(t *testing.T) {
name string

reqBody string
cfg config.Handlers
wantRespBody string
wantStatus int

Expand Down Expand Up @@ -237,12 +240,59 @@ func TestServeGetAsyncSearchesList(t *testing.T) {
reqBody: "invalid-request",
wantStatus: http.StatusBadRequest,
},
{
name: "query_too_long",
reqBody: "{}",
mockArgs: &mockArgs{
proxyReq: &seqapi.GetAsyncSearchesListRequest{},
proxyResp: &seqapi.GetAsyncSearchesListResponse{
Searches: []*seqapi.GetAsyncSearchesListResponse_ListItem{
{
SearchId: mockSearchID1,
Status: seqapi.AsyncSearchStatus_ASYNC_SEARCH_STATUS_DONE,
Request: &seqapi.StartAsyncSearchRequest{
Retention: durationpb.New(60 * time.Second),
Query: tooLongQuery,
From: timestamppb.New(mockTime.Add(-15 * time.Minute)),
To: timestamppb.New(mockTime),
WithDocs: true,
Size: 100,
},
StartedAt: timestamppb.New(mockTime.Add(-30 * time.Second)),
ExpiresAt: timestamppb.New(mockTime.Add(30 * time.Second)),
Progress: 1,
DiskUsage: 512,
OwnerName: mockUserName1,
},
},
Error: &seqapi.Error{
Code: seqapi.ErrorCode_ERROR_CODE_PARTIAL_RESPONSE,
Message: "partial response",
},
},
repoReq: types.GetAsyncSearchesListRequest{},
repoResp: []types.AsyncSearchInfo{
{
SearchID: mockSearchID1,
OwnerID: mockProfileID1,
OwnerName: mockUserName1,
},
},
searchIDs: []string{mockSearchID1},
},
wantRespBody: `{"searches":[{"search_id":"c9a34cf8-4c66-484e-9cc2-42979d848656","status":"done","request":{"retention":"seconds:60","query":"` + TruncatedQuery + `...","from":"2025-08-06T17:37:12.000000123Z","to":"2025-08-06T17:52:12.000000123Z","with_docs":true,"size":100},"started_at":"2025-08-06T17:51:42.000000123Z","expires_at":"2025-08-06T17:52:42.000000123Z","progress":1,"disk_usage":"512","owner_name":"some_user_1"}],"error":{"code":"ERROR_CODE_PARTIAL_RESPONSE","message":"partial response"}}`,
wantStatus: http.StatusOK,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

seqData := test.APITestData{}
seqData := test.APITestData{
AsyncCfg: config.AsyncSearch{
ListQueryLengthLimit: 1000,
},
}

if tt.mockArgs != nil {
ctrl := gomock.NewController(t)
Expand Down
2 changes: 1 addition & 1 deletion internal/api/seqapi/v1/http/test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func initTestAPIWithAsyncSearches(data test.APITestData) *API {
seqDBClients := map[string]seqdb.Client{
config.DefaultSeqDBClientID: data.Mocks.SeqDB,
}
as := asyncsearches.New(context.Background(), data.Mocks.AsyncSearchesRepo, data.Mocks.SeqDB, []string{})
as := asyncsearches.New(context.Background(), data.Mocks.AsyncSearchesRepo, data.Mocks.SeqDB, data.AsyncCfg)
s := service.New(&repository.Repository{
UserProfiles: data.Mocks.ProfilesRepo,
})
Expand Down
5 changes: 3 additions & 2 deletions internal/api/seqapi/v1/test/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ type Mocks struct {
}

type APITestData struct {
Cfg config.SeqAPI
Mocks Mocks
Cfg config.SeqAPI
AsyncCfg config.AsyncSearch
Mocks Mocks
}

func MakeEvent(id string, countData int, t time.Time) *seqapi.Event {
Expand Down
8 changes: 7 additions & 1 deletion internal/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
minGRPCKeepaliveTime = 10 * time.Second
minGRPCKeepaliveTimeout = 1 * time.Second

defaultAsyncSearchListQueryLengthLimit = 1000

defaultMaxSearchTotalLimit = 1000000
defaultMaxSearchOffsetLimit = 1000000
defaultMaxAggregationsPerRequest = 1
Expand Down Expand Up @@ -311,7 +313,8 @@ type ErrorGroups struct {
}

type AsyncSearch struct {
AdminUsers []string `yaml:"admin_users"`
AdminUsers []string `yaml:"admin_users"`
ListQueryLengthLimit int `yaml:"list_query_length_limit"`
}

// FromFile parse config from config path.
Expand Down Expand Up @@ -351,6 +354,9 @@ func FromFile(cfgPath string) (Config, error) {
if cfg.Handlers.SeqAPI.MaxSearchOffsetLimit <= 0 {
cfg.Handlers.SeqAPI.MaxSearchOffsetLimit = defaultMaxSearchOffsetLimit
}
if cfg.Handlers.AsyncSearch.ListQueryLengthLimit <= 0 {
cfg.Handlers.AsyncSearch.ListQueryLengthLimit = defaultAsyncSearchListQueryLengthLimit
}
if cfg.Handlers.SeqAPI.EventsCacheTTL <= 0 {
cfg.Handlers.SeqAPI.EventsCacheTTL = defaultEventsCacheTTL
}
Expand Down
30 changes: 24 additions & 6 deletions internal/pkg/service/async_searches/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import (
"fmt"
"slices"
"time"
"unicode/utf8"

"github.com/cenkalti/backoff/v4"
"go.uber.org/zap"

"github.com/ozontech/seq-ui/internal/app/config"
"github.com/ozontech/seq-ui/internal/app/types"
"github.com/ozontech/seq-ui/internal/pkg/client/seqdb"
"github.com/ozontech/seq-ui/internal/pkg/repository"
"github.com/ozontech/seq-ui/logger"
"github.com/ozontech/seq-ui/metric"
"github.com/ozontech/seq-ui/pkg/seqapi/v1"
)

Expand All @@ -27,19 +30,19 @@ type Service struct {
repo repository.AsyncSearches
seqDB seqdb.Client

admin_users []string
cfg config.AsyncSearch
}

func New(
ctx context.Context,
repo repository.AsyncSearches,
seqDB seqdb.Client,
admins []string,
cfg config.AsyncSearch,
) *Service {
s := &Service{
repo: repo,
seqDB: seqDB,
admin_users: admins,
repo: repo,
seqDB: seqDB,
cfg: cfg,
}

go s.deleteExpiredAsyncSearches(ctx)
Expand All @@ -52,6 +55,9 @@ func (s *Service) StartAsyncSearch(
ownerID int64,
req *seqapi.StartAsyncSearchRequest,
) (*seqapi.StartAsyncSearchResponse, error) {
if utf8.RuneCountInString(req.Query) > s.cfg.ListQueryLengthLimit {
metric.AsyncSearchQueryTooLong.Inc()
}
resp, err := s.seqDB.StartAsyncSearch(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to start async search: %w", err)
Expand Down Expand Up @@ -179,6 +185,7 @@ func (s *Service) GetAsyncSearchesList(

for _, as := range resp.Searches {
as.OwnerName = ownerNameByID[as.SearchId]
as.Request.Query = s.trimQueryToLimit(as.Request.Query, s.cfg.ListQueryLengthLimit)
}

return resp, nil
Expand All @@ -190,7 +197,7 @@ func (s *Service) isAdmin(ctx context.Context) bool {
return false
}

return slices.Index(s.admin_users, userName) >= 0
return slices.Index(s.cfg.AdminUsers, userName) >= 0
}

func (s *Service) deleteExpiredAsyncSearches(ctx context.Context) {
Expand All @@ -209,3 +216,14 @@ func (s *Service) deleteExpiredAsyncSearches(ctx context.Context) {
}
}
}

func (s *Service) trimQueryToLimit(query string, limit int) string {
count := 0
for i := range query {
if count == limit {
return query[:i] + "..."
}
count++
}
return query
}
9 changes: 9 additions & 0 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
repoSubsys = "repository"
clickHouseSubsys = "clickhouse"
massExportSubsys = "mass_export"
asyncSearchSubsys = "async_search"

componentLabel = "component"
methodLabel = "method"
Expand Down Expand Up @@ -209,6 +210,14 @@ var (
Help: "",
Buckets: defaultBuckets,
}, []string{sessionIDLabel})

// async search metrics
AsyncSearchQueryTooLong = promauto.NewCounter(prometheus.CounterOpts{
Namespace: seqUINS,
Subsystem: asyncSearchSubsys,
Name: "requests_query_too_long_total",
Help: "",
})
)

// HandledIncomingRequest handles metrics for processed incoming request.
Expand Down
Loading