using Admin.NET.Plugin.AiDOP.Dto.DataPlatform; using Admin.NET.Plugin.AiDOP.Entity.DataPlatform; using Admin.NET.Plugin.AiDOP.Order; namespace Admin.NET.Plugin.AiDOP.DataPlatform; /// /// 数据中台同步配置中心 API。 /// [ApiDescriptionSettings(Order = 323, Description = "数据中台同步配置中心")] [Route("api/DataPlatform")] [AllowAnonymous] [NonUnify] public class MdpSyncTaskConfigService : IDynamicApiController, ITransient { private static readonly HashSet ProtectedTaskCodes = new(StringComparer.OrdinalIgnoreCase) { "S1_MDP_SYNC_TRANSFORM", "S2_MDP_SYNC_TRANSFORM", "S3_MDP_SYNC_TRANSFORM", "S4_MDP_SYNC_TRANSFORM" }; private readonly ISqlSugarClient _db; private readonly UserManager _userManager; public MdpSyncTaskConfigService(ISqlSugarClient db, UserManager userManager) { _db = db; _userManager = userManager; } [DisplayName("同步任务列表")] [HttpGet("sync-tasks")] public async Task GetList([FromQuery] MdpSyncTaskListQuery input) { var tenantId = _userManager.TenantId; var page = input.Page <= 0 ? 1 : input.Page; var pageSize = input.PageSize <= 0 ? 10 : input.PageSize; var offset = (page - 1) * pageSize; var where = new List { "(t.tenant_id = @TenantId OR t.tenant_id = 0)", "t.status = 1" }; var pars = new List { new("@TenantId", tenantId) }; if (!string.IsNullOrWhiteSpace(input.Keyword)) { where.Add("(t.task_code LIKE @Keyword OR t.task_name LIKE @Keyword OR t.business_domain_name LIKE @Keyword)"); pars.Add(new SugarParameter("@Keyword", $"%{input.Keyword.Trim()}%")); } if (!string.IsNullOrWhiteSpace(input.BusinessDomainCode)) { where.Add("t.business_domain_code = @BusinessDomainCode"); pars.Add(new SugarParameter("@BusinessDomainCode", input.BusinessDomainCode.Trim())); } if (input.Status.HasValue) { where[1] = "t.status = @Status"; pars.Add(new SugarParameter("@Status", input.Status.Value)); } var whereSql = string.Join(" AND ", where); var total = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM mdp_sync_task t WHERE {whereSql}", pars); var list = await _db.Ado.SqlQueryAsync( $""" SELECT t.id AS Id, t.tenant_id AS TenantId, t.task_code AS TaskCode, t.task_name AS TaskName, t.task_type AS TaskType, t.business_domain_code AS BusinessDomainCode, t.business_domain_name AS BusinessDomainName, t.consumer_modules AS ConsumerModules, t.source_system_code AS SourceSystemCode, t.service_key AS ServiceKey, t.job_code AS JobCode, t.schedule_job_id AS ScheduleJobId, t.status AS Status, t.config_version AS ConfigVersion, t.description AS Description, sch.schedule_mode AS ScheduleMode, IFNULL(sch.auto_enabled, 1) AS AutoEnabled, IFNULL(sch.manual_enabled, 1) AS ManualEnabled, run.status AS LastRunStatus, run.start_time AS LastRunTime FROM mdp_sync_task t LEFT JOIN mdp_sync_task_schedule sch ON sch.tenant_id = t.tenant_id AND sch.task_code = t.task_code LEFT JOIN ( SELECT job_code, MAX(start_time) AS last_run_time FROM mdp_transform_run_log WHERE {MdpMonitorService.BuildMdpRunLogTenantWhere(tenantId)} GROUP BY job_code ) latest ON latest.job_code = IFNULL(t.job_code, t.task_code) LEFT JOIN mdp_transform_run_log run ON run.job_code = latest.job_code AND run.start_time = latest.last_run_time WHERE {whereSql} ORDER BY t.task_code LIMIT {pageSize} OFFSET {offset} """, pars); return new { total, page, pageSize, list }; } [DisplayName("同步任务详情")] [HttpGet("sync-tasks/{id:long}")] public async Task GetDetail(long id) { var tenantId = _userManager.TenantId; var row = await _db.Ado.SqlQuerySingleAsync( $""" SELECT t.id AS Id, t.tenant_id AS TenantId, t.task_code AS TaskCode, t.task_name AS TaskName, t.task_type AS TaskType, t.business_domain_code AS BusinessDomainCode, t.business_domain_name AS BusinessDomainName, t.consumer_modules AS ConsumerModules, t.source_system_code AS SourceSystemCode, t.service_key AS ServiceKey, t.job_code AS JobCode, t.schedule_job_id AS ScheduleJobId, t.status AS Status, t.config_version AS ConfigVersion, t.description AS Description, sch.schedule_mode AS ScheduleMode, IFNULL(sch.auto_enabled, 1) AS AutoEnabled, IFNULL(sch.manual_enabled, 1) AS ManualEnabled, run.status AS LastRunStatus, run.start_time AS LastRunTime FROM mdp_sync_task t LEFT JOIN mdp_sync_task_schedule sch ON sch.tenant_id = t.tenant_id AND sch.task_code = t.task_code LEFT JOIN ( SELECT job_code, MAX(start_time) AS last_run_time FROM mdp_transform_run_log WHERE {MdpMonitorService.BuildMdpRunLogTenantWhere(tenantId)} GROUP BY job_code ) latest ON latest.job_code = IFNULL(t.job_code, t.task_code) LEFT JOIN mdp_transform_run_log run ON run.job_code = latest.job_code AND run.start_time = latest.last_run_time WHERE t.id = @Id AND (t.tenant_id = @TenantId OR t.tenant_id = 0) LIMIT 1 """, new SugarParameter("@Id", id), new SugarParameter("@TenantId", tenantId)); if (row == null) throw Oops.Oh("同步任务不存在"); return row; } [DisplayName("新增同步任务")] [HttpPost("sync-tasks")] public async Task Create([FromBody] MdpSyncTaskUpsertInput input) { ValidateUpsert(input); var tenantId = _userManager.TenantId; var exists = await _db.Queryable() .Where(u => u.TenantId == tenantId && u.TaskCode == input.TaskCode.Trim()) .AnyAsync(); if (exists) throw Oops.Oh("任务编码已存在"); var entity = MapUpsert(input, tenantId); entity.ConfigVersion = 1; entity.CreateTime = DateTime.Now; entity.UpdateTime = DateTime.Now; var id = await _db.Insertable(entity).ExecuteReturnBigIdentityAsync(); await EnsureDefaultScheduleAsync(tenantId, entity.TaskCode, entity.ScheduleJobId); return new { id }; } [DisplayName("更新同步任务")] [HttpPut("sync-tasks/{id:long}")] public async Task Update(long id, [FromBody] MdpSyncTaskUpsertInput input) { ValidateUpsert(input); var tenantId = _userManager.TenantId; var entity = await FindEditableTaskAsync(id, tenantId); entity.TaskName = input.TaskName.Trim(); entity.TaskType = string.IsNullOrWhiteSpace(input.TaskType) ? "SERVICE_SYNC" : input.TaskType.Trim(); entity.BusinessDomainCode = input.BusinessDomainCode?.Trim(); entity.BusinessDomainName = input.BusinessDomainName?.Trim(); entity.ConsumerModules = input.ConsumerModules?.Trim(); entity.SourceSystemCode = input.SourceSystemCode?.Trim(); entity.ServiceKey = input.ServiceKey?.Trim(); entity.JobCode = string.IsNullOrWhiteSpace(input.JobCode) ? input.TaskCode.Trim() : input.JobCode.Trim(); entity.ScheduleJobId = input.ScheduleJobId?.Trim(); entity.Status = input.Status <= 0 ? 0 : 1; entity.OwnerRole = input.OwnerRole?.Trim(); entity.Description = input.Description?.Trim(); entity.ConfigVersion += 1; entity.UpdateTime = DateTime.Now; if (!ProtectedTaskCodes.Contains(entity.TaskCode)) entity.TaskCode = input.TaskCode.Trim(); await _db.Updateable(entity).ExecuteCommandAsync(); await SyncScheduleJobIdAsync(entity.TenantId, entity.TaskCode, entity.ScheduleJobId); return new { id = entity.Id }; } [DisplayName("删除同步任务")] [HttpDelete("sync-tasks/{id:long}")] public async Task Delete(long id) { var tenantId = _userManager.TenantId; var entity = await FindEditableTaskAsync(id, tenantId); if (ProtectedTaskCodes.Contains(entity.TaskCode) && entity.TenantId == 0) throw Oops.Oh("内置 MDP 同步任务不允许删除"); entity.Status = 0; entity.UpdateTime = DateTime.Now; await _db.Updateable(entity).UpdateColumns(u => new { u.Status, u.UpdateTime }).ExecuteCommandAsync(); return new { id }; } [DisplayName("同步任务步骤列表")] [HttpGet("sync-tasks/{taskCode}/steps")] public async Task GetSteps(string taskCode) { var tenantId = _userManager.TenantId; await EnsureTaskExistsAsync(taskCode, tenantId); var steps = await _db.Queryable() .Where(u => u.TaskCode == taskCode && (u.TenantId == tenantId || u.TenantId == 0)) .OrderBy(u => u.SortOrder) .ToListAsync(); return steps.Select(MapStepRow).ToList(); } [DisplayName("保存同步任务步骤")] [HttpPut("sync-tasks/{taskCode}/steps")] public async Task SaveSteps(string taskCode, [FromBody] List input) { var tenantId = _userManager.TenantId; var task = await EnsureTaskExistsAsync(taskCode, tenantId); var rows = input ?? new List(); var now = DateTime.Now; foreach (var row in rows) { if (string.IsNullOrWhiteSpace(row.StepCode) || string.IsNullOrWhiteSpace(row.StepName)) throw Oops.Oh("步骤编码和名称不能为空"); var entity = await _db.Queryable() .Where(u => u.TaskCode == taskCode && u.StepCode == row.StepCode && (u.TenantId == tenantId || u.TenantId == 0)) .FirstAsync(); if (entity == null) { entity = new MdpSyncTaskStep { TenantId = task.TenantId == 0 ? 0 : tenantId, TaskCode = taskCode, StepCode = row.StepCode.Trim(), StepName = row.StepName.Trim(), StageType = string.IsNullOrWhiteSpace(row.StageType) ? row.StepCode.Trim() : row.StageType.Trim(), ServiceMethodKey = row.ServiceMethodKey?.Trim(), Enabled = row.Enabled ? 1 : 0, SortOrder = row.SortOrder, Description = row.Description?.Trim(), CreateTime = now, UpdateTime = now }; await _db.Insertable(entity).ExecuteCommandAsync(); } else { entity.StepName = row.StepName.Trim(); entity.StageType = string.IsNullOrWhiteSpace(row.StageType) ? row.StepCode.Trim() : row.StageType.Trim(); entity.ServiceMethodKey = row.ServiceMethodKey?.Trim(); entity.Enabled = row.Enabled ? 1 : 0; entity.SortOrder = row.SortOrder; entity.Description = row.Description?.Trim(); entity.UpdateTime = now; await _db.Updateable(entity).ExecuteCommandAsync(); } } task.ConfigVersion += 1; task.UpdateTime = now; await _db.Updateable(task).UpdateColumns(u => new { u.ConfigVersion, u.UpdateTime }).ExecuteCommandAsync(); return new { count = rows.Count }; } [DisplayName("同步任务调度配置")] [HttpGet("sync-tasks/{taskCode}/schedule")] public async Task GetSchedule(string taskCode) { var tenantId = _userManager.TenantId; await EnsureTaskExistsAsync(taskCode, tenantId); var schedule = await FindScheduleAsync(taskCode, tenantId); if (schedule == null) return new MdpSyncTaskScheduleRow { TaskCode = taskCode, ScheduleMode = "CRON" }; return MapScheduleRow(schedule); } [DisplayName("保存同步任务调度配置")] [HttpPut("sync-tasks/{taskCode}/schedule")] public async Task SaveSchedule(string taskCode, [FromBody] MdpSyncTaskScheduleRow input) { var tenantId = _userManager.TenantId; var task = await EnsureTaskExistsAsync(taskCode, tenantId); var schedule = await FindScheduleAsync(taskCode, tenantId); var now = DateTime.Now; if (schedule == null) { schedule = new MdpSyncTaskSchedule { TenantId = task.TenantId == 0 ? 0 : tenantId, TaskCode = taskCode, CreateTime = now, UpdateTime = now }; } schedule.ScheduleJobId = input.ScheduleJobId?.Trim() ?? task.ScheduleJobId; schedule.ScheduleMode = string.IsNullOrWhiteSpace(input.ScheduleMode) ? "CRON" : input.ScheduleMode.Trim(); schedule.CronExpr = input.CronExpr?.Trim(); schedule.CronDesc = input.CronDesc?.Trim(); schedule.Timezone = string.IsNullOrWhiteSpace(input.Timezone) ? "Asia/Shanghai" : input.Timezone.Trim(); schedule.AutoEnabled = input.AutoEnabled ? 1 : 0; schedule.ManualEnabled = input.ManualEnabled ? 1 : 0; schedule.RetryEnabled = input.RetryEnabled ? 1 : 0; schedule.MaxRetryCount = input.MaxRetryCount <= 0 ? 3 : input.MaxRetryCount; schedule.RetryIntervalSeconds = input.RetryIntervalSeconds <= 0 ? 300 : input.RetryIntervalSeconds; schedule.TimeoutSeconds = input.TimeoutSeconds <= 0 ? 3600 : input.TimeoutSeconds; schedule.MisfirePolicy = input.MisfirePolicy?.Trim(); schedule.SyncWindowType = string.IsNullOrWhiteSpace(input.SyncWindowType) ? "FULL" : input.SyncWindowType.Trim(); schedule.SyncWindowValue = input.SyncWindowValue?.Trim(); schedule.LastScheduleTime = input.LastScheduleTime; schedule.NextScheduleTime = input.NextScheduleTime; schedule.AdminJobConfigJson = input.AdminJobConfigJson; schedule.Description = input.Description?.Trim(); schedule.UpdateTime = now; if (schedule.Id <= 0) await _db.Insertable(schedule).ExecuteCommandAsync(); else await _db.Updateable(schedule).ExecuteCommandAsync(); task.ScheduleJobId = schedule.ScheduleJobId; task.ConfigVersion += 1; task.UpdateTime = now; await _db.Updateable(task).UpdateColumns(u => new { u.ScheduleJobId, u.ConfigVersion, u.UpdateTime }).ExecuteCommandAsync(); return MapScheduleRow(schedule); } [DisplayName("同步 Admin.NET 调度快照")] [HttpPost("sync-tasks/{taskCode}/schedule/sync-admin-job")] public async Task SyncAdminJob(string taskCode) { var tenantId = _userManager.TenantId; var task = await EnsureTaskExistsAsync(taskCode, tenantId); var schedule = await FindScheduleAsync(taskCode, tenantId) ?? new MdpSyncTaskSchedule { TenantId = task.TenantId == 0 ? 0 : tenantId, TaskCode = taskCode, ScheduleJobId = task.ScheduleJobId, ScheduleMode = "CRON", CreateTime = DateTime.Now, UpdateTime = DateTime.Now }; schedule.AdminJobConfigJson = System.Text.Json.JsonSerializer.Serialize(new { scheduleJobId = schedule.ScheduleJobId ?? task.ScheduleJobId, message = "第一阶段仅保存 Admin.NET Job 绑定快照,不写回底层 Cron 配置。" }); schedule.UpdateTime = DateTime.Now; if (schedule.Id <= 0) await _db.Insertable(schedule).ExecuteCommandAsync(); else await _db.Updateable(schedule).UpdateColumns(u => new { u.AdminJobConfigJson, u.UpdateTime }).ExecuteCommandAsync(); return new { taskCode, scheduleJobId = schedule.ScheduleJobId ?? task.ScheduleJobId, adminJobConfigJson = schedule.AdminJobConfigJson }; } private async Task EnsureTaskExistsAsync(string taskCode, long tenantId) { if (string.IsNullOrWhiteSpace(taskCode)) throw Oops.Oh("任务编码不能为空"); var task = await _db.Queryable() .Where(u => u.TaskCode == taskCode && (u.TenantId == tenantId || u.TenantId == 0) && u.Status == 1) .OrderBy(u => u.TenantId, OrderByType.Desc) .FirstAsync(); if (task == null) throw Oops.Oh("同步任务不存在"); return task; } private async Task FindEditableTaskAsync(long id, long tenantId) { var entity = await _db.Queryable() .Where(u => u.Id == id && (u.TenantId == tenantId || u.TenantId == 0)) .FirstAsync(); if (entity == null) throw Oops.Oh("同步任务不存在"); return entity; } private async Task FindScheduleAsync(string taskCode, long tenantId) => await _db.Queryable() .Where(u => u.TaskCode == taskCode && (u.TenantId == tenantId || u.TenantId == 0)) .OrderBy(u => u.TenantId, OrderByType.Desc) .FirstAsync(); private async Task EnsureDefaultScheduleAsync(long tenantId, string taskCode, string? scheduleJobId) { var exists = await _db.Queryable() .Where(u => u.TaskCode == taskCode && (u.TenantId == tenantId || u.TenantId == 0)) .AnyAsync(); if (exists) return; await _db.Insertable(new MdpSyncTaskSchedule { TenantId = tenantId, TaskCode = taskCode, ScheduleJobId = scheduleJobId, ScheduleMode = "CRON", CronDesc = "按 Admin.NET 任务调度执行", AutoEnabled = 1, ManualEnabled = 1, RetryEnabled = 1, MaxRetryCount = 3, RetryIntervalSeconds = 300, TimeoutSeconds = 3600, SyncWindowType = "FULL", CreateTime = DateTime.Now, UpdateTime = DateTime.Now }).ExecuteCommandAsync(); } private async Task SyncScheduleJobIdAsync(long tenantId, string taskCode, string? scheduleJobId) { var schedule = await FindScheduleAsync(taskCode, tenantId); if (schedule == null) { await EnsureDefaultScheduleAsync(tenantId, taskCode, scheduleJobId); return; } schedule.ScheduleJobId = scheduleJobId; schedule.UpdateTime = DateTime.Now; await _db.Updateable(schedule).UpdateColumns(u => new { u.ScheduleJobId, u.UpdateTime }).ExecuteCommandAsync(); } private static void ValidateUpsert(MdpSyncTaskUpsertInput input) { if (string.IsNullOrWhiteSpace(input.TaskCode)) throw Oops.Oh("任务编码不能为空"); if (string.IsNullOrWhiteSpace(input.TaskName)) throw Oops.Oh("任务名称不能为空"); } private static MdpSyncTask MapUpsert(MdpSyncTaskUpsertInput input, long tenantId) => new() { TenantId = tenantId, TaskCode = input.TaskCode.Trim(), TaskName = input.TaskName.Trim(), TaskType = string.IsNullOrWhiteSpace(input.TaskType) ? "SERVICE_SYNC" : input.TaskType.Trim(), BusinessDomainCode = input.BusinessDomainCode?.Trim(), BusinessDomainName = input.BusinessDomainName?.Trim(), ConsumerModules = input.ConsumerModules?.Trim(), SourceSystemCode = input.SourceSystemCode?.Trim(), ServiceKey = input.ServiceKey?.Trim(), JobCode = string.IsNullOrWhiteSpace(input.JobCode) ? input.TaskCode.Trim() : input.JobCode.Trim(), ScheduleJobId = input.ScheduleJobId?.Trim(), Status = input.Status <= 0 ? 0 : 1, OwnerRole = input.OwnerRole?.Trim(), Description = input.Description?.Trim() }; private static MdpSyncTaskStepRow MapStepRow(MdpSyncTaskStep step) => new() { Id = step.Id, StepCode = step.StepCode, StepName = step.StepName, StageType = step.StageType, ServiceMethodKey = step.ServiceMethodKey, Enabled = step.Enabled != 0, SortOrder = step.SortOrder, Description = step.Description }; private static MdpSyncTaskScheduleRow MapScheduleRow(MdpSyncTaskSchedule schedule) => new() { TaskCode = schedule.TaskCode, ScheduleJobId = schedule.ScheduleJobId, ScheduleMode = schedule.ScheduleMode, CronExpr = schedule.CronExpr, CronDesc = schedule.CronDesc, Timezone = schedule.Timezone, AutoEnabled = schedule.AutoEnabled != 0, ManualEnabled = schedule.ManualEnabled != 0, RetryEnabled = schedule.RetryEnabled != 0, MaxRetryCount = schedule.MaxRetryCount, RetryIntervalSeconds = schedule.RetryIntervalSeconds, TimeoutSeconds = schedule.TimeoutSeconds, MisfirePolicy = schedule.MisfirePolicy, SyncWindowType = schedule.SyncWindowType, SyncWindowValue = schedule.SyncWindowValue, LastScheduleTime = schedule.LastScheduleTime, NextScheduleTime = schedule.NextScheduleTime, AdminJobConfigJson = schedule.AdminJobConfigJson, Description = schedule.Description }; }