| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502 |
- using Admin.NET.Plugin.AiDOP.Dto.DataPlatform;
- using Admin.NET.Plugin.AiDOP.Entity.DataPlatform;
- using Admin.NET.Plugin.AiDOP.Order;
- namespace Admin.NET.Plugin.AiDOP.DataPlatform;
- /// <summary>
- /// 数据中台同步配置中心 API。
- /// </summary>
- [ApiDescriptionSettings(Order = 323, Description = "数据中台同步配置中心")]
- [Route("api/DataPlatform")]
- [AllowAnonymous]
- [NonUnify]
- public class MdpSyncTaskConfigService : IDynamicApiController, ITransient
- {
- private static readonly HashSet<string> ProtectedTaskCodes = new(StringComparer.OrdinalIgnoreCase)
- {
- "S1_MDP_SYNC_TRANSFORM",
- "S2_MDP_SYNC_TRANSFORM",
- "S3_MDP_SYNC_TRANSFORM",
- "S4_MDP_SYNC_TRANSFORM"
- };
- private readonly ISqlSugarClient _db;
- private readonly UserManager _userManager;
- public MdpSyncTaskConfigService(ISqlSugarClient db, UserManager userManager)
- {
- _db = db;
- _userManager = userManager;
- }
- [DisplayName("同步任务列表")]
- [HttpGet("sync-tasks")]
- public async Task<object> GetList([FromQuery] MdpSyncTaskListQuery input)
- {
- var tenantId = _userManager.TenantId;
- var page = input.Page <= 0 ? 1 : input.Page;
- var pageSize = input.PageSize <= 0 ? 10 : input.PageSize;
- var offset = (page - 1) * pageSize;
- var where = new List<string> { "(t.tenant_id = @TenantId OR t.tenant_id = 0)", "t.status = 1" };
- var pars = new List<SugarParameter> { new("@TenantId", tenantId) };
- if (!string.IsNullOrWhiteSpace(input.Keyword))
- {
- where.Add("(t.task_code LIKE @Keyword OR t.task_name LIKE @Keyword OR t.business_domain_name LIKE @Keyword)");
- pars.Add(new SugarParameter("@Keyword", $"%{input.Keyword.Trim()}%"));
- }
- if (!string.IsNullOrWhiteSpace(input.BusinessDomainCode))
- {
- where.Add("t.business_domain_code = @BusinessDomainCode");
- pars.Add(new SugarParameter("@BusinessDomainCode", input.BusinessDomainCode.Trim()));
- }
- if (input.Status.HasValue)
- {
- where[1] = "t.status = @Status";
- pars.Add(new SugarParameter("@Status", input.Status.Value));
- }
- var whereSql = string.Join(" AND ", where);
- var total = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM mdp_sync_task t WHERE {whereSql}", pars);
- var list = await _db.Ado.SqlQueryAsync<MdpSyncTaskListRow>(
- $"""
- SELECT t.id AS Id, t.tenant_id AS TenantId, t.task_code AS TaskCode, t.task_name AS TaskName,
- t.task_type AS TaskType, t.business_domain_code AS BusinessDomainCode,
- t.business_domain_name AS BusinessDomainName, t.consumer_modules AS ConsumerModules,
- t.source_system_code AS SourceSystemCode, t.service_key AS ServiceKey,
- t.job_code AS JobCode, t.schedule_job_id AS ScheduleJobId, t.status AS Status,
- t.config_version AS ConfigVersion, t.description AS Description,
- sch.schedule_mode AS ScheduleMode,
- IFNULL(sch.auto_enabled, 1) AS AutoEnabled,
- IFNULL(sch.manual_enabled, 1) AS ManualEnabled,
- run.status AS LastRunStatus, run.start_time AS LastRunTime
- FROM mdp_sync_task t
- LEFT JOIN mdp_sync_task_schedule sch
- ON sch.tenant_id = t.tenant_id AND sch.task_code = t.task_code
- LEFT JOIN (
- SELECT job_code, MAX(start_time) AS last_run_time
- FROM mdp_transform_run_log
- WHERE {MdpMonitorService.BuildMdpRunLogTenantWhere(tenantId)}
- GROUP BY job_code
- ) latest ON latest.job_code = IFNULL(t.job_code, t.task_code)
- LEFT JOIN mdp_transform_run_log run
- ON run.job_code = latest.job_code AND run.start_time = latest.last_run_time
- WHERE {whereSql}
- ORDER BY t.task_code
- LIMIT {pageSize} OFFSET {offset}
- """,
- pars);
- return new { total, page, pageSize, list };
- }
- [DisplayName("同步任务详情")]
- [HttpGet("sync-tasks/{id:long}")]
- public async Task<object> GetDetail(long id)
- {
- var tenantId = _userManager.TenantId;
- var row = await _db.Ado.SqlQuerySingleAsync<MdpSyncTaskListRow>(
- $"""
- SELECT t.id AS Id, t.tenant_id AS TenantId, t.task_code AS TaskCode, t.task_name AS TaskName,
- t.task_type AS TaskType, t.business_domain_code AS BusinessDomainCode,
- t.business_domain_name AS BusinessDomainName, t.consumer_modules AS ConsumerModules,
- t.source_system_code AS SourceSystemCode, t.service_key AS ServiceKey,
- t.job_code AS JobCode, t.schedule_job_id AS ScheduleJobId, t.status AS Status,
- t.config_version AS ConfigVersion, t.description AS Description,
- sch.schedule_mode AS ScheduleMode,
- IFNULL(sch.auto_enabled, 1) AS AutoEnabled,
- IFNULL(sch.manual_enabled, 1) AS ManualEnabled,
- run.status AS LastRunStatus, run.start_time AS LastRunTime
- FROM mdp_sync_task t
- LEFT JOIN mdp_sync_task_schedule sch
- ON sch.tenant_id = t.tenant_id AND sch.task_code = t.task_code
- LEFT JOIN (
- SELECT job_code, MAX(start_time) AS last_run_time
- FROM mdp_transform_run_log
- WHERE {MdpMonitorService.BuildMdpRunLogTenantWhere(tenantId)}
- GROUP BY job_code
- ) latest ON latest.job_code = IFNULL(t.job_code, t.task_code)
- LEFT JOIN mdp_transform_run_log run
- ON run.job_code = latest.job_code AND run.start_time = latest.last_run_time
- WHERE t.id = @Id AND (t.tenant_id = @TenantId OR t.tenant_id = 0)
- LIMIT 1
- """,
- new SugarParameter("@Id", id),
- new SugarParameter("@TenantId", tenantId));
- if (row == null)
- throw Oops.Oh("同步任务不存在");
- return row;
- }
- [DisplayName("新增同步任务")]
- [HttpPost("sync-tasks")]
- public async Task<object> Create([FromBody] MdpSyncTaskUpsertInput input)
- {
- ValidateUpsert(input);
- var tenantId = _userManager.TenantId;
- var exists = await _db.Queryable<MdpSyncTask>()
- .Where(u => u.TenantId == tenantId && u.TaskCode == input.TaskCode.Trim())
- .AnyAsync();
- if (exists)
- throw Oops.Oh("任务编码已存在");
- var entity = MapUpsert(input, tenantId);
- entity.ConfigVersion = 1;
- entity.CreateTime = DateTime.Now;
- entity.UpdateTime = DateTime.Now;
- var id = await _db.Insertable(entity).ExecuteReturnBigIdentityAsync();
- await EnsureDefaultScheduleAsync(tenantId, entity.TaskCode, entity.ScheduleJobId);
- return new { id };
- }
- [DisplayName("更新同步任务")]
- [HttpPut("sync-tasks/{id:long}")]
- public async Task<object> Update(long id, [FromBody] MdpSyncTaskUpsertInput input)
- {
- ValidateUpsert(input);
- var tenantId = _userManager.TenantId;
- var entity = await FindEditableTaskAsync(id, tenantId);
- entity.TaskName = input.TaskName.Trim();
- entity.TaskType = string.IsNullOrWhiteSpace(input.TaskType) ? "SERVICE_SYNC" : input.TaskType.Trim();
- entity.BusinessDomainCode = input.BusinessDomainCode?.Trim();
- entity.BusinessDomainName = input.BusinessDomainName?.Trim();
- entity.ConsumerModules = input.ConsumerModules?.Trim();
- entity.SourceSystemCode = input.SourceSystemCode?.Trim();
- entity.ServiceKey = input.ServiceKey?.Trim();
- entity.JobCode = string.IsNullOrWhiteSpace(input.JobCode) ? input.TaskCode.Trim() : input.JobCode.Trim();
- entity.ScheduleJobId = input.ScheduleJobId?.Trim();
- entity.Status = input.Status <= 0 ? 0 : 1;
- entity.OwnerRole = input.OwnerRole?.Trim();
- entity.Description = input.Description?.Trim();
- entity.ConfigVersion += 1;
- entity.UpdateTime = DateTime.Now;
- if (!ProtectedTaskCodes.Contains(entity.TaskCode))
- entity.TaskCode = input.TaskCode.Trim();
- await _db.Updateable(entity).ExecuteCommandAsync();
- await SyncScheduleJobIdAsync(entity.TenantId, entity.TaskCode, entity.ScheduleJobId);
- return new { id = entity.Id };
- }
- [DisplayName("删除同步任务")]
- [HttpDelete("sync-tasks/{id:long}")]
- public async Task<object> Delete(long id)
- {
- var tenantId = _userManager.TenantId;
- var entity = await FindEditableTaskAsync(id, tenantId);
- if (ProtectedTaskCodes.Contains(entity.TaskCode) && entity.TenantId == 0)
- throw Oops.Oh("内置 MDP 同步任务不允许删除");
- entity.Status = 0;
- entity.UpdateTime = DateTime.Now;
- await _db.Updateable(entity).UpdateColumns(u => new { u.Status, u.UpdateTime }).ExecuteCommandAsync();
- return new { id };
- }
- [DisplayName("同步任务步骤列表")]
- [HttpGet("sync-tasks/{taskCode}/steps")]
- public async Task<object> GetSteps(string taskCode)
- {
- var tenantId = _userManager.TenantId;
- await EnsureTaskExistsAsync(taskCode, tenantId);
- var steps = await _db.Queryable<MdpSyncTaskStep>()
- .Where(u => u.TaskCode == taskCode && (u.TenantId == tenantId || u.TenantId == 0))
- .OrderBy(u => u.SortOrder)
- .ToListAsync();
- return steps.Select(MapStepRow).ToList();
- }
- [DisplayName("保存同步任务步骤")]
- [HttpPut("sync-tasks/{taskCode}/steps")]
- public async Task<object> SaveSteps(string taskCode, [FromBody] List<MdpSyncTaskStepRow> input)
- {
- var tenantId = _userManager.TenantId;
- var task = await EnsureTaskExistsAsync(taskCode, tenantId);
- var rows = input ?? new List<MdpSyncTaskStepRow>();
- var now = DateTime.Now;
- foreach (var row in rows)
- {
- if (string.IsNullOrWhiteSpace(row.StepCode) || string.IsNullOrWhiteSpace(row.StepName))
- throw Oops.Oh("步骤编码和名称不能为空");
- var entity = await _db.Queryable<MdpSyncTaskStep>()
- .Where(u => u.TaskCode == taskCode && u.StepCode == row.StepCode && (u.TenantId == tenantId || u.TenantId == 0))
- .FirstAsync();
- if (entity == null)
- {
- entity = new MdpSyncTaskStep
- {
- TenantId = task.TenantId == 0 ? 0 : tenantId,
- TaskCode = taskCode,
- StepCode = row.StepCode.Trim(),
- StepName = row.StepName.Trim(),
- StageType = string.IsNullOrWhiteSpace(row.StageType) ? row.StepCode.Trim() : row.StageType.Trim(),
- ServiceMethodKey = row.ServiceMethodKey?.Trim(),
- Enabled = row.Enabled ? 1 : 0,
- SortOrder = row.SortOrder,
- Description = row.Description?.Trim(),
- CreateTime = now,
- UpdateTime = now
- };
- await _db.Insertable(entity).ExecuteCommandAsync();
- }
- else
- {
- entity.StepName = row.StepName.Trim();
- entity.StageType = string.IsNullOrWhiteSpace(row.StageType) ? row.StepCode.Trim() : row.StageType.Trim();
- entity.ServiceMethodKey = row.ServiceMethodKey?.Trim();
- entity.Enabled = row.Enabled ? 1 : 0;
- entity.SortOrder = row.SortOrder;
- entity.Description = row.Description?.Trim();
- entity.UpdateTime = now;
- await _db.Updateable(entity).ExecuteCommandAsync();
- }
- }
- task.ConfigVersion += 1;
- task.UpdateTime = now;
- await _db.Updateable(task).UpdateColumns(u => new { u.ConfigVersion, u.UpdateTime }).ExecuteCommandAsync();
- return new { count = rows.Count };
- }
- [DisplayName("同步任务调度配置")]
- [HttpGet("sync-tasks/{taskCode}/schedule")]
- public async Task<object> GetSchedule(string taskCode)
- {
- var tenantId = _userManager.TenantId;
- await EnsureTaskExistsAsync(taskCode, tenantId);
- var schedule = await FindScheduleAsync(taskCode, tenantId);
- if (schedule == null)
- return new MdpSyncTaskScheduleRow { TaskCode = taskCode, ScheduleMode = "CRON" };
- return MapScheduleRow(schedule);
- }
- [DisplayName("保存同步任务调度配置")]
- [HttpPut("sync-tasks/{taskCode}/schedule")]
- public async Task<object> SaveSchedule(string taskCode, [FromBody] MdpSyncTaskScheduleRow input)
- {
- var tenantId = _userManager.TenantId;
- var task = await EnsureTaskExistsAsync(taskCode, tenantId);
- var schedule = await FindScheduleAsync(taskCode, tenantId);
- var now = DateTime.Now;
- if (schedule == null)
- {
- schedule = new MdpSyncTaskSchedule
- {
- TenantId = task.TenantId == 0 ? 0 : tenantId,
- TaskCode = taskCode,
- CreateTime = now,
- UpdateTime = now
- };
- }
- schedule.ScheduleJobId = input.ScheduleJobId?.Trim() ?? task.ScheduleJobId;
- schedule.ScheduleMode = string.IsNullOrWhiteSpace(input.ScheduleMode) ? "CRON" : input.ScheduleMode.Trim();
- schedule.CronExpr = input.CronExpr?.Trim();
- schedule.CronDesc = input.CronDesc?.Trim();
- schedule.Timezone = string.IsNullOrWhiteSpace(input.Timezone) ? "Asia/Shanghai" : input.Timezone.Trim();
- schedule.AutoEnabled = input.AutoEnabled ? 1 : 0;
- schedule.ManualEnabled = input.ManualEnabled ? 1 : 0;
- schedule.RetryEnabled = input.RetryEnabled ? 1 : 0;
- schedule.MaxRetryCount = input.MaxRetryCount <= 0 ? 3 : input.MaxRetryCount;
- schedule.RetryIntervalSeconds = input.RetryIntervalSeconds <= 0 ? 300 : input.RetryIntervalSeconds;
- schedule.TimeoutSeconds = input.TimeoutSeconds <= 0 ? 3600 : input.TimeoutSeconds;
- schedule.MisfirePolicy = input.MisfirePolicy?.Trim();
- schedule.SyncWindowType = string.IsNullOrWhiteSpace(input.SyncWindowType) ? "FULL" : input.SyncWindowType.Trim();
- schedule.SyncWindowValue = input.SyncWindowValue?.Trim();
- schedule.LastScheduleTime = input.LastScheduleTime;
- schedule.NextScheduleTime = input.NextScheduleTime;
- schedule.AdminJobConfigJson = input.AdminJobConfigJson;
- schedule.Description = input.Description?.Trim();
- schedule.UpdateTime = now;
- if (schedule.Id <= 0)
- await _db.Insertable(schedule).ExecuteCommandAsync();
- else
- await _db.Updateable(schedule).ExecuteCommandAsync();
- task.ScheduleJobId = schedule.ScheduleJobId;
- task.ConfigVersion += 1;
- task.UpdateTime = now;
- await _db.Updateable(task).UpdateColumns(u => new { u.ScheduleJobId, u.ConfigVersion, u.UpdateTime }).ExecuteCommandAsync();
- return MapScheduleRow(schedule);
- }
- [DisplayName("同步 Admin.NET 调度快照")]
- [HttpPost("sync-tasks/{taskCode}/schedule/sync-admin-job")]
- public async Task<object> SyncAdminJob(string taskCode)
- {
- var tenantId = _userManager.TenantId;
- var task = await EnsureTaskExistsAsync(taskCode, tenantId);
- var schedule = await FindScheduleAsync(taskCode, tenantId) ?? new MdpSyncTaskSchedule
- {
- TenantId = task.TenantId == 0 ? 0 : tenantId,
- TaskCode = taskCode,
- ScheduleJobId = task.ScheduleJobId,
- ScheduleMode = "CRON",
- CreateTime = DateTime.Now,
- UpdateTime = DateTime.Now
- };
- schedule.AdminJobConfigJson = System.Text.Json.JsonSerializer.Serialize(new
- {
- scheduleJobId = schedule.ScheduleJobId ?? task.ScheduleJobId,
- message = "第一阶段仅保存 Admin.NET Job 绑定快照,不写回底层 Cron 配置。"
- });
- schedule.UpdateTime = DateTime.Now;
- if (schedule.Id <= 0)
- await _db.Insertable(schedule).ExecuteCommandAsync();
- else
- await _db.Updateable(schedule).UpdateColumns(u => new { u.AdminJobConfigJson, u.UpdateTime }).ExecuteCommandAsync();
- return new
- {
- taskCode,
- scheduleJobId = schedule.ScheduleJobId ?? task.ScheduleJobId,
- adminJobConfigJson = schedule.AdminJobConfigJson
- };
- }
- private async Task<MdpSyncTask> EnsureTaskExistsAsync(string taskCode, long tenantId)
- {
- if (string.IsNullOrWhiteSpace(taskCode))
- throw Oops.Oh("任务编码不能为空");
- var task = await _db.Queryable<MdpSyncTask>()
- .Where(u => u.TaskCode == taskCode && (u.TenantId == tenantId || u.TenantId == 0) && u.Status == 1)
- .OrderBy(u => u.TenantId, OrderByType.Desc)
- .FirstAsync();
- if (task == null)
- throw Oops.Oh("同步任务不存在");
- return task;
- }
- private async Task<MdpSyncTask> FindEditableTaskAsync(long id, long tenantId)
- {
- var entity = await _db.Queryable<MdpSyncTask>()
- .Where(u => u.Id == id && (u.TenantId == tenantId || u.TenantId == 0))
- .FirstAsync();
- if (entity == null)
- throw Oops.Oh("同步任务不存在");
- return entity;
- }
- private async Task<MdpSyncTaskSchedule?> FindScheduleAsync(string taskCode, long tenantId) =>
- await _db.Queryable<MdpSyncTaskSchedule>()
- .Where(u => u.TaskCode == taskCode && (u.TenantId == tenantId || u.TenantId == 0))
- .OrderBy(u => u.TenantId, OrderByType.Desc)
- .FirstAsync();
- private async Task EnsureDefaultScheduleAsync(long tenantId, string taskCode, string? scheduleJobId)
- {
- var exists = await _db.Queryable<MdpSyncTaskSchedule>()
- .Where(u => u.TaskCode == taskCode && (u.TenantId == tenantId || u.TenantId == 0))
- .AnyAsync();
- if (exists)
- return;
- await _db.Insertable(new MdpSyncTaskSchedule
- {
- TenantId = tenantId,
- TaskCode = taskCode,
- ScheduleJobId = scheduleJobId,
- ScheduleMode = "CRON",
- CronDesc = "按 Admin.NET 任务调度执行",
- AutoEnabled = 1,
- ManualEnabled = 1,
- RetryEnabled = 1,
- MaxRetryCount = 3,
- RetryIntervalSeconds = 300,
- TimeoutSeconds = 3600,
- SyncWindowType = "FULL",
- CreateTime = DateTime.Now,
- UpdateTime = DateTime.Now
- }).ExecuteCommandAsync();
- }
- private async Task SyncScheduleJobIdAsync(long tenantId, string taskCode, string? scheduleJobId)
- {
- var schedule = await FindScheduleAsync(taskCode, tenantId);
- if (schedule == null)
- {
- await EnsureDefaultScheduleAsync(tenantId, taskCode, scheduleJobId);
- return;
- }
- schedule.ScheduleJobId = scheduleJobId;
- schedule.UpdateTime = DateTime.Now;
- await _db.Updateable(schedule).UpdateColumns(u => new { u.ScheduleJobId, u.UpdateTime }).ExecuteCommandAsync();
- }
- private static void ValidateUpsert(MdpSyncTaskUpsertInput input)
- {
- if (string.IsNullOrWhiteSpace(input.TaskCode))
- throw Oops.Oh("任务编码不能为空");
- if (string.IsNullOrWhiteSpace(input.TaskName))
- throw Oops.Oh("任务名称不能为空");
- }
- private static MdpSyncTask MapUpsert(MdpSyncTaskUpsertInput input, long tenantId) =>
- new()
- {
- TenantId = tenantId,
- TaskCode = input.TaskCode.Trim(),
- TaskName = input.TaskName.Trim(),
- TaskType = string.IsNullOrWhiteSpace(input.TaskType) ? "SERVICE_SYNC" : input.TaskType.Trim(),
- BusinessDomainCode = input.BusinessDomainCode?.Trim(),
- BusinessDomainName = input.BusinessDomainName?.Trim(),
- ConsumerModules = input.ConsumerModules?.Trim(),
- SourceSystemCode = input.SourceSystemCode?.Trim(),
- ServiceKey = input.ServiceKey?.Trim(),
- JobCode = string.IsNullOrWhiteSpace(input.JobCode) ? input.TaskCode.Trim() : input.JobCode.Trim(),
- ScheduleJobId = input.ScheduleJobId?.Trim(),
- Status = input.Status <= 0 ? 0 : 1,
- OwnerRole = input.OwnerRole?.Trim(),
- Description = input.Description?.Trim()
- };
- private static MdpSyncTaskStepRow MapStepRow(MdpSyncTaskStep step) =>
- new()
- {
- Id = step.Id,
- StepCode = step.StepCode,
- StepName = step.StepName,
- StageType = step.StageType,
- ServiceMethodKey = step.ServiceMethodKey,
- Enabled = step.Enabled != 0,
- SortOrder = step.SortOrder,
- Description = step.Description
- };
- private static MdpSyncTaskScheduleRow MapScheduleRow(MdpSyncTaskSchedule schedule) =>
- new()
- {
- TaskCode = schedule.TaskCode,
- ScheduleJobId = schedule.ScheduleJobId,
- ScheduleMode = schedule.ScheduleMode,
- CronExpr = schedule.CronExpr,
- CronDesc = schedule.CronDesc,
- Timezone = schedule.Timezone,
- AutoEnabled = schedule.AutoEnabled != 0,
- ManualEnabled = schedule.ManualEnabled != 0,
- RetryEnabled = schedule.RetryEnabled != 0,
- MaxRetryCount = schedule.MaxRetryCount,
- RetryIntervalSeconds = schedule.RetryIntervalSeconds,
- TimeoutSeconds = schedule.TimeoutSeconds,
- MisfirePolicy = schedule.MisfirePolicy,
- SyncWindowType = schedule.SyncWindowType,
- SyncWindowValue = schedule.SyncWindowValue,
- LastScheduleTime = schedule.LastScheduleTime,
- NextScheduleTime = schedule.NextScheduleTime,
- AdminJobConfigJson = schedule.AdminJobConfigJson,
- Description = schedule.Description
- };
- }
|