using Admin.NET.Plugin.AiDOP.Dto.DataPlatform; using Admin.NET.Plugin.AiDOP.Entity.DataPlatform; namespace Admin.NET.Plugin.AiDOP.DataPlatform; /// /// 同步任务实体与字段映射配置 API。 /// [ApiDescriptionSettings(Order = 324, Description = "数据中台同步实体与字段映射")] [Route("api/DataPlatform")] [AllowAnonymous] [NonUnify] public class MdpSyncEntityConfigService : IDynamicApiController, ITransient { private static readonly HashSet ValidFieldTypes = new(StringComparer.OrdinalIgnoreCase) { "DIRECT", "JSONPATH", "SCRIPT", "CONST", "LOOKUP" }; private readonly ISqlSugarClient _db; private readonly UserManager _userManager; public MdpSyncEntityConfigService(ISqlSugarClient db, UserManager userManager) { _db = db; _userManager = userManager; } [DisplayName("同步任务实体列表")] [HttpGet("sync-tasks/{taskCode}/entities")] public async Task GetEntities(string taskCode) { var tenantId = _userManager.TenantId; var task = await FindTaskAsync(taskCode, tenantId); var prefix = ResolveEntityCodePrefix(task.TaskCode); var jobCode = string.IsNullOrWhiteSpace(task.JobCode) ? task.TaskCode : task.JobCode; var where = new List { "(e.tenant_id = @TenantId OR e.tenant_id = 0)", "e.status = 1" }; var pars = new List { new("@TenantId", tenantId) }; if (!string.IsNullOrWhiteSpace(prefix)) { where.Add("(e.entity_code LIKE @EntityPrefix OR e.job_id = @JobCode)"); pars.Add(new SugarParameter("@EntityPrefix", prefix + "%")); pars.Add(new SugarParameter("@JobCode", jobCode)); } else { where.Add("e.job_id = @JobCode"); pars.Add(new SugarParameter("@JobCode", jobCode)); } var whereSql = string.Join(" AND ", where); var list = await _db.Ado.SqlQueryAsync( $""" SELECT e.id AS Id, e.tenant_id AS TenantId, e.source_id AS SourceId, s.source_code AS SourceCode, s.source_name AS SourceName, s.source_type AS SourceType, e.entity_code AS EntityCode, e.entity_name AS EntityName, e.entity_type AS EntityType, e.source_table_name AS SourceTableName, e.source_api_path AS SourceApiPath, e.target_table_name AS TargetTableName, e.sync_mode AS SyncMode, e.incr_column AS IncrColumn, e.batch_size AS BatchSize, e.job_id AS JobId, e.status AS Status, e.remark AS Remark, IFNULL(m.cnt, 0) AS FieldMappingCount FROM mdp_entity e LEFT JOIN mdp_source s ON s.id = e.source_id LEFT JOIN ( SELECT entity_id, COUNT(1) AS cnt FROM mdp_field_mapping GROUP BY entity_id ) m ON m.entity_id = e.id WHERE {whereSql} ORDER BY e.entity_code """, pars); return list; } [DisplayName("新增同步任务实体")] [HttpPost("sync-tasks/{taskCode}/entities")] public async Task CreateEntity(string taskCode, [FromBody] MdpEntityUpsertInput input) { ValidateEntityUpsert(input); var tenantId = _userManager.TenantId; var task = await FindTaskAsync(taskCode, tenantId); var sourceId = await ResolveSourceIdAsync(input, tenantId); var entityTenantId = task.TenantId == 0 ? 0L : tenantId; var jobCode = string.IsNullOrWhiteSpace(task.JobCode) ? task.TaskCode : task.JobCode; var exists = await _db.Queryable() .Where(u => u.TenantId == entityTenantId && u.EntityCode == input.EntityCode.Trim()) .AnyAsync(); if (exists) throw Oops.Oh("实体编码已存在"); var now = DateTime.Now; var entity = new MdpEntity { TenantId = entityTenantId, SourceId = sourceId, EntityCode = input.EntityCode.Trim(), EntityName = input.EntityName.Trim(), EntityType = string.IsNullOrWhiteSpace(input.EntityType) ? "TABLE" : input.EntityType.Trim(), SourceTableName = input.SourceTableName?.Trim(), SourceApiPath = input.SourceApiPath?.Trim(), TargetTableName = input.TargetTableName?.Trim(), SyncMode = string.IsNullOrWhiteSpace(input.SyncMode) ? "INCR" : input.SyncMode.Trim(), IncrColumn = input.IncrColumn?.Trim(), BatchSize = input.BatchSize <= 0 ? 5000 : input.BatchSize, JobId = string.IsNullOrWhiteSpace(input.JobId) ? jobCode : input.JobId.Trim(), Status = input.Status <= 0 ? 0 : 1, Remark = input.Remark?.Trim(), CreateTime = now, UpdateTime = now }; var id = await _db.Insertable(entity).ExecuteReturnBigIdentityAsync(); await BumpTaskConfigVersionAsync(task); return new { id }; } [DisplayName("更新同步实体")] [HttpPut("entities/{id:long}")] public async Task UpdateEntity(long id, [FromBody] MdpEntityUpsertInput input) { ValidateEntityUpsert(input); var tenantId = _userManager.TenantId; var entity = await FindEditableEntityAsync(id, tenantId); entity.EntityName = input.EntityName.Trim(); entity.EntityType = string.IsNullOrWhiteSpace(input.EntityType) ? "TABLE" : input.EntityType.Trim(); entity.SourceId = await ResolveSourceIdAsync(input, tenantId, entity.SourceId); entity.SourceTableName = input.SourceTableName?.Trim(); entity.SourceApiPath = input.SourceApiPath?.Trim(); entity.TargetTableName = input.TargetTableName?.Trim(); entity.SyncMode = string.IsNullOrWhiteSpace(input.SyncMode) ? "INCR" : input.SyncMode.Trim(); entity.IncrColumn = input.IncrColumn?.Trim(); entity.BatchSize = input.BatchSize <= 0 ? 5000 : input.BatchSize; entity.JobId = input.JobId?.Trim(); entity.Status = input.Status <= 0 ? 0 : 1; entity.Remark = input.Remark?.Trim(); entity.UpdateTime = DateTime.Now; if (!IsBuiltInEntity(entity)) entity.EntityCode = input.EntityCode.Trim(); await _db.Updateable(entity).ExecuteCommandAsync(); return new { id = entity.Id }; } [DisplayName("删除同步实体")] [HttpDelete("entities/{id:long}")] public async Task DeleteEntity(long id) { var tenantId = _userManager.TenantId; var entity = await FindEditableEntityAsync(id, tenantId); if (IsBuiltInEntity(entity) && entity.TenantId == 0) throw Oops.Oh("内置 MDP 实体不允许删除"); entity.Status = 0; entity.UpdateTime = DateTime.Now; await _db.Updateable(entity).UpdateColumns(u => new { u.Status, u.UpdateTime }).ExecuteCommandAsync(); return new { id = entity.Id }; } [DisplayName("实体字段映射列表")] [HttpGet("entities/{entityId:long}/field-mappings")] public async Task GetFieldMappings(long entityId) { var tenantId = _userManager.TenantId; await FindEditableEntityAsync(entityId, tenantId); var list = await _db.Queryable() .Where(u => u.EntityId == entityId) .OrderBy(u => u.SortOrder) .OrderBy(u => u.TargetField) .ToListAsync(); return list.Select(MapFieldMappingRow).ToList(); } [DisplayName("新增实体字段映射")] [HttpPost("entities/{entityId:long}/field-mappings")] public async Task CreateFieldMapping(long entityId, [FromBody] MdpFieldMappingUpsertInput input) { ValidateFieldMappingUpsert(input); var tenantId = _userManager.TenantId; await FindEditableEntityAsync(entityId, tenantId); var exists = await _db.Queryable() .Where(u => u.EntityId == entityId && u.TargetField == input.TargetField.Trim()) .AnyAsync(); if (exists) throw Oops.Oh("目标字段映射已存在"); var entity = MapFieldMappingInsert(entityId, input); var id = await _db.Insertable(entity).ExecuteReturnBigIdentityAsync(); return new { id }; } [DisplayName("更新字段映射")] [HttpPut("field-mappings/{id:long}")] public async Task UpdateFieldMapping(long id, [FromBody] MdpFieldMappingUpsertInput input) { ValidateFieldMappingUpsert(input); var tenantId = _userManager.TenantId; var mapping = await _db.Queryable().Where(u => u.Id == id).FirstAsync(); if (mapping == null) throw Oops.Oh("字段映射不存在"); await FindEditableEntityAsync(mapping.EntityId, tenantId); var duplicate = await _db.Queryable() .Where(u => u.EntityId == mapping.EntityId && u.TargetField == input.TargetField.Trim() && u.Id != id) .AnyAsync(); if (duplicate) throw Oops.Oh("目标字段映射已存在"); mapping.SourceField = input.SourceField.Trim(); mapping.TargetField = input.TargetField.Trim(); mapping.FieldType = NormalizeFieldType(input.FieldType); mapping.TransformScript = input.TransformScript?.Trim(); mapping.ConstValue = input.ConstValue?.Trim(); mapping.LookupTable = input.LookupTable?.Trim(); mapping.IsRequired = input.IsRequired ? 1 : 0; mapping.DefaultValue = input.DefaultValue?.Trim(); mapping.SortOrder = input.SortOrder; await _db.Updateable(mapping).ExecuteCommandAsync(); return new { id = mapping.Id }; } [DisplayName("删除字段映射")] [HttpDelete("field-mappings/{id:long}")] public async Task DeleteFieldMapping(long id) { var tenantId = _userManager.TenantId; var mapping = await _db.Queryable().Where(u => u.Id == id).FirstAsync(); if (mapping == null) throw Oops.Oh("字段映射不存在"); await FindEditableEntityAsync(mapping.EntityId, tenantId); await _db.Deleteable().Where(u => u.Id == id).ExecuteCommandAsync(); return new { id }; } private async Task FindTaskAsync(string taskCode, long tenantId) { if (string.IsNullOrWhiteSpace(taskCode)) throw Oops.Oh("任务编码不能为空"); var task = await _db.Queryable() .Where(u => u.TaskCode == taskCode && (u.TenantId == tenantId || u.TenantId == 0) && u.Status == 1) .OrderBy(u => u.TenantId, OrderByType.Desc) .FirstAsync(); if (task == null) throw Oops.Oh("同步任务不存在"); return task; } private async Task FindEditableEntityAsync(long id, long tenantId) { var entity = await _db.Queryable() .Where(u => u.Id == id && (u.TenantId == tenantId || u.TenantId == 0)) .FirstAsync(); if (entity == null) throw Oops.Oh("同步实体不存在"); return entity; } private async Task ResolveSourceIdAsync(MdpEntityUpsertInput input, long tenantId, long? fallbackSourceId = null) { if (input.SourceId > 0) return input.SourceId; if (!string.IsNullOrWhiteSpace(input.SourceCode)) { var sourceId = await _db.Ado.GetLongAsync( """ SELECT id FROM mdp_source WHERE source_code = @SourceCode AND (tenant_id = @TenantId OR tenant_id = 0) ORDER BY tenant_id DESC LIMIT 1 """, new List { new("@SourceCode", input.SourceCode.Trim()), new("@TenantId", tenantId) }); if (sourceId > 0) return sourceId; throw Oops.Oh($"数据源 {input.SourceCode} 不存在"); } if (fallbackSourceId is > 0) return fallbackSourceId.Value; throw Oops.Oh("请指定 sourceId 或 sourceCode"); } private async Task BumpTaskConfigVersionAsync(MdpSyncTask task) { task.ConfigVersion += 1; task.UpdateTime = DateTime.Now; await _db.Updateable(task).UpdateColumns(u => new { u.ConfigVersion, u.UpdateTime }).ExecuteCommandAsync(); } private static string? ResolveEntityCodePrefix(string taskCode) { if (string.IsNullOrWhiteSpace(taskCode)) return null; var first = taskCode.Split('_', StringSplitOptions.RemoveEmptyEntries).FirstOrDefault(); if (string.IsNullOrWhiteSpace(first) || first.Length < 2 || first[0] != 'S' || !char.IsDigit(first[1])) return null; return first + "_"; } private static bool IsBuiltInEntity(MdpEntity entity) => entity.EntityCode.StartsWith("S1_", StringComparison.OrdinalIgnoreCase) || entity.EntityCode.StartsWith("S2_", StringComparison.OrdinalIgnoreCase) || entity.EntityCode.StartsWith("S3_", StringComparison.OrdinalIgnoreCase) || entity.EntityCode.StartsWith("S4_", StringComparison.OrdinalIgnoreCase); private static void ValidateEntityUpsert(MdpEntityUpsertInput input) { if (string.IsNullOrWhiteSpace(input.EntityCode)) throw Oops.Oh("实体编码不能为空"); if (string.IsNullOrWhiteSpace(input.EntityName)) throw Oops.Oh("实体名称不能为空"); } private static void ValidateFieldMappingUpsert(MdpFieldMappingUpsertInput input) { if (string.IsNullOrWhiteSpace(input.SourceField)) throw Oops.Oh("源字段不能为空"); if (string.IsNullOrWhiteSpace(input.TargetField)) throw Oops.Oh("目标字段不能为空"); } private static string NormalizeFieldType(string? fieldType) { var normalized = string.IsNullOrWhiteSpace(fieldType) ? "DIRECT" : fieldType.Trim().ToUpperInvariant(); return ValidFieldTypes.Contains(normalized) ? normalized : "DIRECT"; } private static MdpFieldMapping MapFieldMappingInsert(long entityId, MdpFieldMappingUpsertInput input) => new() { EntityId = entityId, SourceField = input.SourceField.Trim(), TargetField = input.TargetField.Trim(), FieldType = NormalizeFieldType(input.FieldType), TransformScript = input.TransformScript?.Trim(), ConstValue = input.ConstValue?.Trim(), LookupTable = input.LookupTable?.Trim(), IsRequired = input.IsRequired ? 1 : 0, DefaultValue = input.DefaultValue?.Trim(), SortOrder = input.SortOrder, CreateTime = DateTime.Now }; private static MdpFieldMappingRow MapFieldMappingRow(MdpFieldMapping row) => new() { Id = row.Id, EntityId = row.EntityId, SourceField = row.SourceField, TargetField = row.TargetField, FieldType = row.FieldType, TransformScript = row.TransformScript, ConstValue = row.ConstValue, LookupTable = row.LookupTable, IsRequired = row.IsRequired != 0, DefaultValue = row.DefaultValue, SortOrder = row.SortOrder }; }