-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinit.go
More file actions
121 lines (103 loc) · 3.42 KB
/
init.go
File metadata and controls
121 lines (103 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package peerdb
import (
"context"
"github.com/elastic/go-elasticsearch/v9"
"github.com/hashicorp/go-cleanhttp"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog"
"gitlab.com/tozd/go/errors"
internalBase "gitlab.com/peerdb/peerdb/internal/base"
internalSearch "gitlab.com/peerdb/peerdb/internal/search"
internalStore "gitlab.com/peerdb/peerdb/internal/store"
)
// init initializes the store, coordinator, storage, and bridge for a specific site.
//
// It can be called multiple times. In that case it will not initialize again if
// the site has already been initialized.
func (s *Site) init(ctx context.Context, logger zerolog.Logger, dbpool *pgxpool.Pool, esClient *elasticsearch.TypedClient, shards int) (func(), errors.E) {
if s.initialized {
return nil, nil //nolint:nilnil
}
s.initialized = true
ctx = WithFallbackDBContext(ctx, s.Schema, "init")
ctx = logger.With().Str("schema", s.Schema).Str("index", s.Index).Logger().WithContext(ctx)
b, riverClient, onShutdown, errE := internalBase.InitAndStartComponents(ctx, logger, dbpool, esClient, s.Schema, s.Index, shards, s.LanguagePriority)
if errE != nil {
return onShutdown, errE
}
s.Base = b
s.DBPool = dbpool
s.ESClient = esClient
s.RiverClient = riverClient
return onShutdown, nil
}
// Init initializes PeerDB for all sites defined in globals.
//
// It establishes connections to PostgreSQL database and ElasticSearch.
// It configures PostgreSQL schemas and ElasticSearch indices.
//
// It can be called multiple times. In that case it will initialize only
// sites which have not been initialized yet.
//
// You have to run site.Start for each site after this call to start the
// base for each site.
func Init(ctx context.Context, globals *Globals) (func(), errors.E) {
var dbpool *pgxpool.Pool
var esClient *elasticsearch.TypedClient
// First we check if any site have them initialized already.
for _, site := range globals.Sites {
if dbpool == nil && site.DBPool != nil {
dbpool = site.DBPool
}
if esClient == nil && site.ESClient != nil {
esClient = site.ESClient
}
if dbpool != nil && esClient != nil {
break
}
}
onShutdown := []func(){}
onShutdownF := func() {
for _, f := range onShutdown {
f()
}
}
// Initialize for the first time.
if dbpool == nil {
var errE errors.E
var dbpoolCleanup func()
// We use context.WithoutCancel here because we want to cancel the pool ourselves and not when context
// is cancelled (so that cleanup code which needs PostgreSQL access can continue to use connections).
dbpool, dbpoolCleanup, errE = internalStore.InitPostgres(
context.WithoutCancel(ctx),
string(globals.Postgres.URL),
globals.Logger,
getRequestWithFallback(),
)
if errE != nil {
return nil, errE
}
// We want dbpoolCleanup to be last.
onShutdown = append(onShutdown, dbpoolCleanup)
}
// Initialize for the first time.
if esClient == nil {
var errE errors.E
esClient, errE = internalSearch.GetClient(cleanhttp.DefaultPooledClient(), globals.Logger, globals.Elastic.URL)
if errE != nil {
return onShutdownF, errE
}
}
for i := range globals.Sites {
site := &globals.Sites[i]
onS, errE := site.init(ctx, globals.Logger, dbpool, esClient, globals.Elastic.Shards)
// We want existing onShutdown functions (e.g., dbpool.Close) to be last.
if onS != nil {
onShutdown = append([]func(){onS}, onShutdown...)
}
if errE != nil {
return onShutdownF, errE
}
}
return onShutdownF, nil
}