Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions assets/databases/heimdall/data/job_statuses.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ values
(4, 'FAILED'),
(5, 'KILLED'),
(6, 'SUCCEEDED'),
(7, 'CANCELLING'),
(8, 'CANCELLED')
(7, 'CANCELING'),
(8, 'CANCELED')
on conflict (job_status_id) do update
set
job_status_name = excluded.job_status_name;
12 changes: 10 additions & 2 deletions assets/databases/heimdall/tables/jobs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,13 @@ create table if not exists jobs
);

alter table jobs add column if not exists store_result_sync boolean not null default false;
alter table jobs add column if not exists cancelled_by varchar(64) null;
update jobs set cancelled_by = '' where cancelled_by is null;
alter table jobs add column if not exists canceled_by varchar(64) null;

-- Originally had "cancelled_by" column and "cancelling" status, but we aren't british. Whoops.
do $$ begin
if exists (select 1 from information_schema.columns where table_name = 'jobs' and column_name = 'cancelled_by') then
update jobs set canceled_by = cancelled_by where canceled_by is null and cancelled_by is not null;
alter table jobs drop column cancelled_by;
end if;
end $$;
update jobs set canceled_by = '' where canceled_by is null;
6 changes: 4 additions & 2 deletions cmd/heimdall/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
defaultReadHeaderTimeout = 2 // seconds
defaultJanitorKeepalive = 5 // seconds
defaultStaleJob = 45 // seconds
defaultCleanInterval = 60 // seconds
)

var (
Expand All @@ -46,8 +47,9 @@ func main() {
ReadHeaderTimeout: defaultReadHeaderTimeout,
},
Janitor: &janitor.Janitor{
Keepalive: defaultJanitorKeepalive,
StaleJob: defaultStaleJob,
Keepalive: defaultJanitorKeepalive,
StaleJob: defaultStaleJob,
CleanInterval: defaultCleanInterval,
},
}

Expand Down
11 changes: 6 additions & 5 deletions internal/pkg/heimdall/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Heimdall struct {
Janitor *janitor.Janitor `yaml:"janitor,omitempty" json:"janitor,omitempty"`
Version string `yaml:"-" json:"-"`
agentName string
commandHandlers map[string]plugin.Handler
commandHandlers map[string]*plugin.Handlers
}

func (h *Heimdall) Init() error {
Expand Down Expand Up @@ -96,7 +96,7 @@ func (h *Heimdall) Init() error {
rbacsByName[rbacName] = r
}

h.commandHandlers = make(map[string]plugin.Handler)
h.commandHandlers = make(map[string]*plugin.Handlers)

// process commands / add default values if missing, write commands to db
for _, c := range h.Commands {
Expand All @@ -112,11 +112,12 @@ func (h *Heimdall) Init() error {
return fmt.Errorf(formatErrUnknownPlugin, c.Plugin)
}

handler, err := pluginNew(c.Context)
handlers, err := pluginNew(c.Context)
if err != nil {
return err
}
h.commandHandlers[c.ID] = handler

h.commandHandlers[c.ID] = handlers

// let's record command in the database
if err := h.commandUpsert(c); err != nil {
Expand Down Expand Up @@ -151,7 +152,7 @@ func (h *Heimdall) Init() error {
}

// start janitor
if err := h.Janitor.Start(h.Database); err != nil {
if err := h.Janitor.Start(h.Database, h.commandHandlers, h.Clusters); err != nil {
return err
}

Expand Down
17 changes: 9 additions & 8 deletions internal/pkg/heimdall/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,15 @@ func (h *Heimdall) runJob(ctx context.Context, j *job.Job, command *command.Comm
jobDone := make(chan error, 1)
cancelMonitorDone := make(chan struct{})

// Create cancellable context for the job
// Create cancelable context for the job
pluginCtx, cancel := context.WithCancel(ctx)
defer cancel()

// Start plugin execution in goroutine
go func() {
defer close(cancelMonitorDone) // signal monitoring to stop
err := h.commandHandlers[command.ID](pluginCtx, runtime, j, cluster)
handlers := h.commandHandlers[command.ID]
err := handlers.Handler(pluginCtx, runtime, j, cluster)
jobDone <- err
}()

Expand All @@ -140,10 +141,10 @@ func (h *Heimdall) runJob(ctx context.Context, j *job.Job, command *command.Comm
case <-cancelMonitorDone:
return
case <-ticker.C:
// If job is in cancelling state, trigger context cancellation
// If job is in canceling state, trigger context cancellation
result, err := h.getJobStatus(ctx, &jobRequest{ID: j.ID})
if err == nil {
if job, ok := result.(*job.Job); ok && job.Status == jobStatus.Cancelling {
if job, ok := result.(*job.Job); ok && job.Status == jobStatus.Canceling {
cancel()
return
}
Expand All @@ -156,14 +157,14 @@ func (h *Heimdall) runJob(ctx context.Context, j *job.Job, command *command.Comm
// Wait for job execution to complete
jobErr := <-jobDone

// Check if context was cancelled and mark status appropriately
// Check if context was canceled and mark status appropriately
if pluginCtx.Err() != nil {
j.Status = jobStatus.Cancelling // janitor will update to cancelled when resources are cleaned up
j.Status = jobStatus.Canceling // janitor will update to canceled when resources are cleaned up
runJobMethod.LogAndCountError(pluginCtx.Err(), command.Name, cluster.Name)
return nil
}

// Handle plugin execution result (only if not cancelled)
// Handle plugin execution result (only if not canceled)
if jobErr != nil {
j.Status = jobStatus.Failed
j.Error = jobErr.Error()
Expand Down Expand Up @@ -236,7 +237,7 @@ func (h *Heimdall) cancelJob(ctx context.Context, req *jobRequest) (any, error)

// return job status
return &job.Job{
Status: jobStatus.Cancelling,
Status: jobStatus.Canceling,
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/heimdall/job_dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (h *Heimdall) insertJob(j *job.Job, clusterID, commandID string) (int64, er
defer sess.Close()

// insert job row
jobID, err := sess.InsertRow(queryJobInsert, clusterID, commandID, j.Status, j.ID, j.Name, j.Version, j.Description, j.Context.String(), j.Error, j.User, j.IsSync, j.StoreResultSync, j.CancelledBy)
jobID, err := sess.InsertRow(queryJobInsert, clusterID, commandID, j.Status, j.ID, j.Name, j.Version, j.Description, j.Context.String(), j.Error, j.User, j.IsSync, j.StoreResultSync, j.CanceledBy)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) {
var jobContext string

if err := row.Scan(&r.SystemID, &r.Status, &r.Name, &r.Version, &r.Description, &jobContext, &r.Error, &r.User, &r.IsSync,
&r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CancelledBy); err != nil {
&r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy); err != nil {
if err == sql.ErrNoRows {
return nil, ErrUnknownJobID
} else {
Expand Down Expand Up @@ -253,7 +253,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error)
r := &job.Job{}

if err := rows.Scan(&r.SystemID, &r.ID, &r.Status, &r.Name, &r.Version, &r.Description, &jobContext, &r.Error, &r.User, &r.IsSync,
&r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CancelledBy); err != nil {
&r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy); err != nil {
getJobsMethod.LogAndCountError(err, "scan")
return nil, err
}
Expand Down
25 changes: 21 additions & 4 deletions internal/pkg/heimdall/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ const (
pluginExtensionLength = len(pluginExtension)
)

func (h *Heimdall) loadPlugins() (map[string]func(*context.Context) (hp.Handler, error), error) {
func (h *Heimdall) loadPlugins() (map[string]func(*context.Context) (*hp.Handlers, error), error) {

plugins := make(map[string]func(*context.Context) (hp.Handler, error))
plugins := make(map[string]func(*context.Context) (*hp.Handlers, error))

files, err := os.ReadDir(h.PluginsDirectory)
if err != nil {
Expand All @@ -35,11 +35,28 @@ func (h *Heimdall) loadPlugins() (map[string]func(*context.Context) (hp.Handler,
if err != nil {
return nil, err
}
// is it our plugin?
newPluginFunc, ok := newFunc.(func(*context.Context) (hp.Handler, error))
// try new signature first: func(*context.Context) (*hp.Handlers, error)
newPluginFunc, ok := newFunc.(func(*context.Context) (*hp.Handlers, error))
if ok {
plugins[stripExtension(file.Name())] = newPluginFunc
continue
}

// make backward compatible with old signature: func(*context.Context) (hp.Handler, error)
oldPluginFunc, ok := newFunc.(func(*context.Context) (hp.Handler, error))
if ok {
plugins[stripExtension(file.Name())] = func(ctx *context.Context) (*hp.Handlers, error) {
handler, err := oldPluginFunc(ctx)
if err != nil {
return nil, err
}
return &hp.Handlers{
Handler: handler,
CleanupHandler: nil,
}, nil
}
}

}
}

Expand Down
1 change: 1 addition & 0 deletions internal/pkg/heimdall/queries/job/active_select.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ with a as
join clusters cl on cl.system_cluster_id = jj.job_cluster_id
where
aj.agent_name is null
and jj.job_status_id != 7 -- Not canceling. Jobs can be canceled before being assigned to an agent.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jobs with the status of "NEW" have not yet been assigned an agent. These jobs CAN be canceled though. This where clause will prohibit those canceled jobs from being picked up and sent to workers.

order by
aj.system_job_id
for update
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/heimdall/queries/job/insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ insert into jobs
username,
is_sync,
store_result_sync,
cancelled_by
canceled_by
)
select
cm.system_command_id,
Expand All @@ -27,7 +27,7 @@ select
$10, -- username
$11, -- is_sync
$12, -- store_result_sync
$13 -- cancelled_by
$13 -- canceled_by
from
clusters cl,
commands cm
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/heimdall/queries/job/select.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ select
cl.cluster_id,
cl.cluster_name,
j.store_result_sync,
j.cancelled_by
j.canceled_by
from
jobs j
left join commands cm on cm.system_command_id = j.job_command_id
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/heimdall/queries/job/select_jobs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ select
cl.cluster_id,
cl.cluster_name,
j.store_result_sync,
j.cancelled_by
j.canceled_by
from
jobs j
join job_statuses js on js.job_status_id = j.job_status_id
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/heimdall/queries/job/status_cancel_update.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
update jobs
set
job_status_id = 7, -- CANCELLING
cancelled_by = $2,
job_status_id = 7, -- CANCELING
canceled_by = $2,
updated_at = extract(epoch from now())::int
where
job_id = $1
and job_status_id not in (4, 5, 6, 8); -- Not in FAILED, KILLED, SUCCEEDED, CANCELLED
and job_status_id not in (4, 5, 6, 8); -- Not in FAILED, KILLED, SUCCEEDED, CANCELED
49 changes: 32 additions & 17 deletions internal/pkg/janitor/janitor.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,56 @@
package janitor

import (
"fmt"
"time"

"github.com/hladush/go-telemetry/pkg/telemetry"
"github.com/patterninc/heimdall/internal/pkg/database"
"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/object/job"
"github.com/patterninc/heimdall/pkg/plugin"
)

const (
defaultJobLimit = 25
defaultNumWorkers = 3
)

var (
janitorStartMethod = telemetry.NewMethod("janitor", "start")
)

type Janitor struct {
Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"`
StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"`
db *database.Database
Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"`
StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"`
CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"`
db *database.Database
commandHandlers map[string]*plugin.Handlers
clusters cluster.Clusters
}

func (j *Janitor) Start(d *database.Database) error {
func (j *Janitor) Start(d *database.Database, commandHandlers map[string]*plugin.Handlers, clusters cluster.Clusters) error {

// record database context
j.db = d
j.commandHandlers = commandHandlers
j.clusters = clusters

// create channel for cleanup jobs
jobChan := make(chan *job.Job, defaultJobLimit*2)

// let's run jobs cleanup once before we start it as a go routine
if err := j.cleanupStaleJobs(); err != nil {
return err
// start cleanup workers
for i := 0; i < defaultNumWorkers; i++ {
go j.cleanupWorker(jobChan)
}

// start cleanup loop
// send jobs to channel
go func() {

for {

if err := j.cleanupStaleJobs(); err != nil {
fmt.Println(`Janitor error:`, err)
if err := j.queryAndSendJobs(jobChan); err != nil {
janitorStartMethod.CountError("query_and_send_jobs")
}

time.Sleep(60 * time.Second)

time.Sleep(time.Duration(j.CleanInterval) * time.Second)
}

}()

return nil
Expand Down
Loading
Loading