Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.wgzhao.addax.admin.common;

/**
* Source-level collection date policy.
*/
public enum CollectDateMode
{
DAILY,
WEEKDAY,
WEEKEND
}

Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.wgzhao.addax.admin.model;

import com.wgzhao.addax.admin.common.DbType;
import com.wgzhao.addax.admin.common.CollectDateMode;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
Expand Down Expand Up @@ -44,6 +47,10 @@ public class EtlSource
@Column(name = "start_at")
private LocalTime startAt;

@Enumerated(EnumType.STRING)
@Column(name = "collect_date_mode", length = 16, nullable = false)
private CollectDateMode collectDateMode = CollectDateMode.DAILY;

@Column(name = "prerequisite", length = 4000)
private String prerequisite;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class VwEtlTableWithSource
* 采集源的调度时间点(HH:mm:ss / HH:mm)
*/
private String sourceStartAt;
private String collectDateMode;

/**
* 表级调度时间点(HH:mm:ss / HH:mm),可为空,表示继承 sourceStartAt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ where t.status not in ( 'X' ,'U')
@Query("UPDATE EtlTable t SET t.status = 'N', t.retryCnt = 3 where t.status not in ( 'X', 'U')")
void resetAllEtlFlags();

@Modifying
@Query("UPDATE EtlTable t SET t.status = 'N', t.retryCnt = 3 where t.status not in ( 'X', 'U') and t.sid in :sourceIds")
void resetEtlFlagsBySourceIds(@Param("sourceIds") List<Integer> sourceIds);

@Query(value = """
select t
from EtlTable t
Expand All @@ -48,14 +52,16 @@ select count(*) from etl_table t left join etl_source s on t.sid = s.id
@Query("""
SELECT t FROM EtlTable t JOIN EtlSource s on t.sid = s.id
WHERE t.status NOT IN ('Y','X','U') AND t.retryCnt > 0
AND t.sid in :sourceIds
AND (
:checkTime = false OR
(s.startAt > :switchTime AND s.startAt < :currentTime)
)
""")
List<EtlTable> findRunnableTasks(@Param("switchTime") LocalTime switchTime,
@Param("currentTime") LocalTime currentTime,
@Param("checkTime") boolean checkTime);
@Param("checkTime") boolean checkTime,
@Param("sourceIds") List<Integer> sourceIds);

/**
* 查询某个 source 下“继承调度”的可运行任务(表 startAt 为空,实际调度由 source.startAt 决定)
Expand All @@ -64,38 +70,44 @@ List<EtlTable> findRunnableTasks(@Param("switchTime") LocalTime switchTime,
SELECT t FROM EtlTable t JOIN EtlSource s on t.sid = s.id
WHERE s.enabled = true
AND t.sid = :sid
AND t.sid in :sourceIds
AND t.startAt is null
AND t.status NOT IN ('Y','X','U')
AND t.retryCnt > 0
""")
List<EtlTable> findRunnableInheritedTasksBySource(@Param("sid") int sid);
List<EtlTable> findRunnableInheritedTasksBySource(@Param("sid") int sid,
@Param("sourceIds") List<Integer> sourceIds);

/**
* 查询“表级覆盖调度”的可运行任务(表 startAt 等于指定时间点)
*/
@Query("""
SELECT t FROM EtlTable t JOIN EtlSource s on t.sid = s.id
WHERE s.enabled = true
AND t.sid in :sourceIds
AND t.startAt = :startAt
AND t.status = 'N'
AND t.retryCnt > 0
""")
List<EtlTable> findRunnableOverrideTasksByStartAt(@Param("startAt") LocalTime startAt);
List<EtlTable> findRunnableOverrideTasksByStartAt(@Param("startAt") LocalTime startAt,
@Param("sourceIds") List<Integer> sourceIds);

/**
* 查询“表级覆盖调度”的可运行任务(表 startAt 落在指定时间窗口内,闭区间)。
*/
@Query("""
SELECT t FROM EtlTable t JOIN EtlSource s on t.sid = s.id
WHERE s.enabled = true
AND t.sid in :sourceIds
AND t.startAt is not null
AND t.startAt >= :from
AND t.startAt <= :to
AND t.status = 'N'
AND t.retryCnt > 0
""")
List<EtlTable> findRunnableOverrideTasksBetween(@Param("from") LocalTime from,
@Param("to") LocalTime to);
@Param("to") LocalTime to,
@Param("sourceIds") List<Integer> sourceIds);

int countBySid(int sid);

Expand All @@ -106,4 +118,10 @@ List<EtlTable> findRunnableOverrideTasksBetween(@Param("from") LocalTime from,
WHERE t.status <> 'X' AND s.enabled = true
""")
List<EtlTable> findCanRefreshTables();

@Query("""
SELECT t FROM EtlTable t JOIN EtlSource s on t.sid = s.id
WHERE t.status <> 'X' AND s.enabled = true AND t.sid in :sourceIds
""")
List<EtlTable> findCanRefreshTablesBySourceIds(@Param("sourceIds") List<Integer> sourceIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.wgzhao.addax.admin.model.EtlSource;
import com.wgzhao.addax.admin.redis.RedisLockService;
import com.wgzhao.addax.admin.repository.EtlSourceRepo;
import com.wgzhao.addax.admin.service.SourceScheduleMatcher;
import com.wgzhao.addax.admin.service.SystemConfigService;
import com.wgzhao.addax.admin.service.TaskSchedulerService;
import com.wgzhao.addax.admin.service.TaskService;
import lombok.AllArgsConstructor;
Expand All @@ -12,6 +14,7 @@
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.List;

Expand All @@ -24,6 +27,8 @@ public class CollectionScheduler
private final EtlSourceRepo etlSourceRepo;
private final TaskService taskService;
private final RedisLockService redisLockService;
private final SourceScheduleMatcher sourceScheduleMatcher;
private final SystemConfigService configService;

@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady()
Expand Down Expand Up @@ -61,6 +66,12 @@ public void scheduleOrUpdateTask(EtlSource source)
log.info("Could not acquire lock for source {}, skipping this run", source.getCode());
return;
}
LocalDate bizDate = configService.getBizDateAsDate();
if (!sourceScheduleMatcher.matches(source, bizDate)) {
log.info("Skip scheduled collection for source {} because collectDateMode={} does not match bizDate={}",
source.getCode(), source.getCollectDateMode(), bizDate);
return;
}
// keep existing behavior: enqueue runnable tables for this source
taskService.executeTasksForSource(source.getId());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.wgzhao.addax.admin.service;

import com.wgzhao.addax.admin.common.CollectDateMode;
import com.wgzhao.addax.admin.model.EtlSource;
import org.springframework.stereotype.Component;

import java.time.DayOfWeek;
import java.time.LocalDate;
import java.util.List;

@Component
public class SourceScheduleMatcher
{
public boolean matches(EtlSource source, LocalDate bizDate)
{
if (source == null || bizDate == null) {
return false;
}
return matches(resolveMode(source), bizDate.getDayOfWeek());
}

public List<Integer> resolveMatchedEnabledSourceIds(List<EtlSource> enabledSources, LocalDate bizDate)
{
if (enabledSources == null || enabledSources.isEmpty() || bizDate == null) {
return List.of();
}
DayOfWeek dayOfWeek = bizDate.getDayOfWeek();
return enabledSources.stream()
.filter(EtlSource::isEnabled)
.filter(source -> matches(resolveMode(source), dayOfWeek))
.map(EtlSource::getId)
.toList();
}

private boolean matches(CollectDateMode mode, DayOfWeek dayOfWeek)
{
if (mode == CollectDateMode.WEEKDAY) {
return dayOfWeek != DayOfWeek.SATURDAY && dayOfWeek != DayOfWeek.SUNDAY;
}
if (mode == CollectDateMode.WEEKEND) {
return dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY;
}
return true;
}

private CollectDateMode resolveMode(EtlSource source)
{
return source.getCollectDateMode() == null ? CollectDateMode.DAILY : source.getCollectDateMode();
}
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.wgzhao.addax.admin.service;

import com.wgzhao.addax.admin.common.DbType;
import com.wgzhao.addax.admin.common.CollectDateMode;
import com.wgzhao.addax.admin.dto.TableMetaDto;
import com.wgzhao.addax.admin.event.SourceUpdatedEvent;
import com.wgzhao.addax.admin.model.EtlSource;
Expand Down Expand Up @@ -117,11 +118,19 @@ public EtlSource save(EtlSource etlSource)
etlSource.setDbType(parsed.getValue());
}

if (etlSource.getCollectDateMode() == null) {
etlSource.setCollectDateMode(CollectDateMode.DAILY);
}

etlSourceRepo.save(etlSource);
if (existing == null) {
return etlSource;
}
boolean scheduleChanged = existing.getStartAt() != etlSource.getStartAt();
CollectDateMode existingCollectDateMode = existing.getCollectDateMode() == null ? CollectDateMode.DAILY : existing.getCollectDateMode();
CollectDateMode newCollectDateMode = etlSource.getCollectDateMode() == null ? CollectDateMode.DAILY : etlSource.getCollectDateMode();
boolean scheduleChanged = !Objects.equals(existing.getStartAt(), etlSource.getStartAt())
|| existing.isEnabled() != etlSource.isEnabled()
|| existingCollectDateMode != newCollectDateMode;

// 更新该采集源下所有采集任务的模板,这里主要考虑到可能调整了采集源的连接参数
// 如果连接串,账号,密码三者没变更,则不要更新任务模板
Expand Down Expand Up @@ -185,9 +194,13 @@ public EtlSource create(EtlSource etlSource)
etlSource.setDbType(parsed.getValue());
}

if (etlSource.getCollectDateMode() == null) {
etlSource.setCollectDateMode(CollectDateMode.DAILY);
}

EtlSource save = etlSourceRepo.save(etlSource);
// 新采集源创建时,默认创建一个同步任务
collectionScheduler.scheduleOrUpdateTask(etlSource);
collectionScheduler.scheduleOrUpdateTask(save);
return save;
}

Expand Down
Loading