| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373 |
- using Admin.NET.Plugin.AiDOP.Dto.DataPlatform;
- using Admin.NET.Plugin.AiDOP.Entity.DataPlatform;
- namespace Admin.NET.Plugin.AiDOP.DataPlatform;
- /// <summary>
- /// 同步任务实体与字段映射配置 API。
- /// </summary>
- [ApiDescriptionSettings(Order = 324, Description = "数据中台同步实体与字段映射")]
- [Route("api/DataPlatform")]
- [AllowAnonymous]
- [NonUnify]
- public class MdpSyncEntityConfigService : IDynamicApiController, ITransient
- {
- private static readonly HashSet<string> ValidFieldTypes = new(StringComparer.OrdinalIgnoreCase)
- {
- "DIRECT", "JSONPATH", "SCRIPT", "CONST", "LOOKUP"
- };
- private readonly ISqlSugarClient _db;
- private readonly UserManager _userManager;
- public MdpSyncEntityConfigService(ISqlSugarClient db, UserManager userManager)
- {
- _db = db;
- _userManager = userManager;
- }
- [DisplayName("同步任务实体列表")]
- [HttpGet("sync-tasks/{taskCode}/entities")]
- public async Task<object> GetEntities(string taskCode)
- {
- var tenantId = _userManager.TenantId;
- var task = await FindTaskAsync(taskCode, tenantId);
- var prefix = ResolveEntityCodePrefix(task.TaskCode);
- var jobCode = string.IsNullOrWhiteSpace(task.JobCode) ? task.TaskCode : task.JobCode;
- var where = new List<string> { "(e.tenant_id = @TenantId OR e.tenant_id = 0)", "e.status = 1" };
- var pars = new List<SugarParameter> { new("@TenantId", tenantId) };
- if (!string.IsNullOrWhiteSpace(prefix))
- {
- where.Add("(e.entity_code LIKE @EntityPrefix OR e.job_id = @JobCode)");
- pars.Add(new SugarParameter("@EntityPrefix", prefix + "%"));
- pars.Add(new SugarParameter("@JobCode", jobCode));
- }
- else
- {
- where.Add("e.job_id = @JobCode");
- pars.Add(new SugarParameter("@JobCode", jobCode));
- }
- var whereSql = string.Join(" AND ", where);
- var list = await _db.Ado.SqlQueryAsync<MdpEntityRow>(
- $"""
- SELECT e.id AS Id, e.tenant_id AS TenantId, e.source_id AS SourceId,
- s.source_code AS SourceCode, s.source_name AS SourceName, s.source_type AS SourceType,
- e.entity_code AS EntityCode, e.entity_name AS EntityName, e.entity_type AS EntityType,
- e.source_table_name AS SourceTableName, e.source_api_path AS SourceApiPath,
- e.target_table_name AS TargetTableName, e.sync_mode AS SyncMode, e.incr_column AS IncrColumn,
- e.batch_size AS BatchSize, e.job_id AS JobId, e.status AS Status, e.remark AS Remark,
- IFNULL(m.cnt, 0) AS FieldMappingCount
- FROM mdp_entity e
- LEFT JOIN mdp_source s ON s.id = e.source_id
- LEFT JOIN (
- SELECT entity_id, COUNT(1) AS cnt FROM mdp_field_mapping GROUP BY entity_id
- ) m ON m.entity_id = e.id
- WHERE {whereSql}
- ORDER BY e.entity_code
- """,
- pars);
- return list;
- }
- [DisplayName("新增同步任务实体")]
- [HttpPost("sync-tasks/{taskCode}/entities")]
- public async Task<object> CreateEntity(string taskCode, [FromBody] MdpEntityUpsertInput input)
- {
- ValidateEntityUpsert(input);
- var tenantId = _userManager.TenantId;
- var task = await FindTaskAsync(taskCode, tenantId);
- var sourceId = await ResolveSourceIdAsync(input, tenantId);
- var entityTenantId = task.TenantId == 0 ? 0L : tenantId;
- var jobCode = string.IsNullOrWhiteSpace(task.JobCode) ? task.TaskCode : task.JobCode;
- var exists = await _db.Queryable<MdpEntity>()
- .Where(u => u.TenantId == entityTenantId && u.EntityCode == input.EntityCode.Trim())
- .AnyAsync();
- if (exists)
- throw Oops.Oh("实体编码已存在");
- var now = DateTime.Now;
- var entity = new MdpEntity
- {
- TenantId = entityTenantId,
- SourceId = sourceId,
- EntityCode = input.EntityCode.Trim(),
- EntityName = input.EntityName.Trim(),
- EntityType = string.IsNullOrWhiteSpace(input.EntityType) ? "TABLE" : input.EntityType.Trim(),
- SourceTableName = input.SourceTableName?.Trim(),
- SourceApiPath = input.SourceApiPath?.Trim(),
- TargetTableName = input.TargetTableName?.Trim(),
- SyncMode = string.IsNullOrWhiteSpace(input.SyncMode) ? "INCR" : input.SyncMode.Trim(),
- IncrColumn = input.IncrColumn?.Trim(),
- BatchSize = input.BatchSize <= 0 ? 5000 : input.BatchSize,
- JobId = string.IsNullOrWhiteSpace(input.JobId) ? jobCode : input.JobId.Trim(),
- Status = input.Status <= 0 ? 0 : 1,
- Remark = input.Remark?.Trim(),
- CreateTime = now,
- UpdateTime = now
- };
- var id = await _db.Insertable(entity).ExecuteReturnBigIdentityAsync();
- await BumpTaskConfigVersionAsync(task);
- return new { id };
- }
- [DisplayName("更新同步实体")]
- [HttpPut("entities/{id:long}")]
- public async Task<object> UpdateEntity(long id, [FromBody] MdpEntityUpsertInput input)
- {
- ValidateEntityUpsert(input);
- var tenantId = _userManager.TenantId;
- var entity = await FindEditableEntityAsync(id, tenantId);
- entity.EntityName = input.EntityName.Trim();
- entity.EntityType = string.IsNullOrWhiteSpace(input.EntityType) ? "TABLE" : input.EntityType.Trim();
- entity.SourceId = await ResolveSourceIdAsync(input, tenantId, entity.SourceId);
- entity.SourceTableName = input.SourceTableName?.Trim();
- entity.SourceApiPath = input.SourceApiPath?.Trim();
- entity.TargetTableName = input.TargetTableName?.Trim();
- entity.SyncMode = string.IsNullOrWhiteSpace(input.SyncMode) ? "INCR" : input.SyncMode.Trim();
- entity.IncrColumn = input.IncrColumn?.Trim();
- entity.BatchSize = input.BatchSize <= 0 ? 5000 : input.BatchSize;
- entity.JobId = input.JobId?.Trim();
- entity.Status = input.Status <= 0 ? 0 : 1;
- entity.Remark = input.Remark?.Trim();
- entity.UpdateTime = DateTime.Now;
- if (!IsBuiltInEntity(entity))
- entity.EntityCode = input.EntityCode.Trim();
- await _db.Updateable(entity).ExecuteCommandAsync();
- return new { id = entity.Id };
- }
- [DisplayName("删除同步实体")]
- [HttpDelete("entities/{id:long}")]
- public async Task<object> DeleteEntity(long id)
- {
- var tenantId = _userManager.TenantId;
- var entity = await FindEditableEntityAsync(id, tenantId);
- if (IsBuiltInEntity(entity) && entity.TenantId == 0)
- throw Oops.Oh("内置 MDP 实体不允许删除");
- entity.Status = 0;
- entity.UpdateTime = DateTime.Now;
- await _db.Updateable(entity).UpdateColumns(u => new { u.Status, u.UpdateTime }).ExecuteCommandAsync();
- return new { id = entity.Id };
- }
- [DisplayName("实体字段映射列表")]
- [HttpGet("entities/{entityId:long}/field-mappings")]
- public async Task<object> GetFieldMappings(long entityId)
- {
- var tenantId = _userManager.TenantId;
- await FindEditableEntityAsync(entityId, tenantId);
- var list = await _db.Queryable<MdpFieldMapping>()
- .Where(u => u.EntityId == entityId)
- .OrderBy(u => u.SortOrder)
- .OrderBy(u => u.TargetField)
- .ToListAsync();
- return list.Select(MapFieldMappingRow).ToList();
- }
- [DisplayName("新增实体字段映射")]
- [HttpPost("entities/{entityId:long}/field-mappings")]
- public async Task<object> CreateFieldMapping(long entityId, [FromBody] MdpFieldMappingUpsertInput input)
- {
- ValidateFieldMappingUpsert(input);
- var tenantId = _userManager.TenantId;
- await FindEditableEntityAsync(entityId, tenantId);
- var exists = await _db.Queryable<MdpFieldMapping>()
- .Where(u => u.EntityId == entityId && u.TargetField == input.TargetField.Trim())
- .AnyAsync();
- if (exists)
- throw Oops.Oh("目标字段映射已存在");
- var entity = MapFieldMappingInsert(entityId, input);
- var id = await _db.Insertable(entity).ExecuteReturnBigIdentityAsync();
- return new { id };
- }
- [DisplayName("更新字段映射")]
- [HttpPut("field-mappings/{id:long}")]
- public async Task<object> UpdateFieldMapping(long id, [FromBody] MdpFieldMappingUpsertInput input)
- {
- ValidateFieldMappingUpsert(input);
- var tenantId = _userManager.TenantId;
- var mapping = await _db.Queryable<MdpFieldMapping>().Where(u => u.Id == id).FirstAsync();
- if (mapping == null)
- throw Oops.Oh("字段映射不存在");
- await FindEditableEntityAsync(mapping.EntityId, tenantId);
- var duplicate = await _db.Queryable<MdpFieldMapping>()
- .Where(u => u.EntityId == mapping.EntityId && u.TargetField == input.TargetField.Trim() && u.Id != id)
- .AnyAsync();
- if (duplicate)
- throw Oops.Oh("目标字段映射已存在");
- mapping.SourceField = input.SourceField.Trim();
- mapping.TargetField = input.TargetField.Trim();
- mapping.FieldType = NormalizeFieldType(input.FieldType);
- mapping.TransformScript = input.TransformScript?.Trim();
- mapping.ConstValue = input.ConstValue?.Trim();
- mapping.LookupTable = input.LookupTable?.Trim();
- mapping.IsRequired = input.IsRequired ? 1 : 0;
- mapping.DefaultValue = input.DefaultValue?.Trim();
- mapping.SortOrder = input.SortOrder;
- await _db.Updateable(mapping).ExecuteCommandAsync();
- return new { id = mapping.Id };
- }
- [DisplayName("删除字段映射")]
- [HttpDelete("field-mappings/{id:long}")]
- public async Task<object> DeleteFieldMapping(long id)
- {
- var tenantId = _userManager.TenantId;
- var mapping = await _db.Queryable<MdpFieldMapping>().Where(u => u.Id == id).FirstAsync();
- if (mapping == null)
- throw Oops.Oh("字段映射不存在");
- await FindEditableEntityAsync(mapping.EntityId, tenantId);
- await _db.Deleteable<MdpFieldMapping>().Where(u => u.Id == id).ExecuteCommandAsync();
- return new { id };
- }
- private async Task<MdpSyncTask> FindTaskAsync(string taskCode, long tenantId)
- {
- if (string.IsNullOrWhiteSpace(taskCode))
- throw Oops.Oh("任务编码不能为空");
- var task = await _db.Queryable<MdpSyncTask>()
- .Where(u => u.TaskCode == taskCode && (u.TenantId == tenantId || u.TenantId == 0) && u.Status == 1)
- .OrderBy(u => u.TenantId, OrderByType.Desc)
- .FirstAsync();
- if (task == null)
- throw Oops.Oh("同步任务不存在");
- return task;
- }
- private async Task<MdpEntity> FindEditableEntityAsync(long id, long tenantId)
- {
- var entity = await _db.Queryable<MdpEntity>()
- .Where(u => u.Id == id && (u.TenantId == tenantId || u.TenantId == 0))
- .FirstAsync();
- if (entity == null)
- throw Oops.Oh("同步实体不存在");
- return entity;
- }
- private async Task<long> ResolveSourceIdAsync(MdpEntityUpsertInput input, long tenantId, long? fallbackSourceId = null)
- {
- if (input.SourceId > 0)
- return input.SourceId;
- if (!string.IsNullOrWhiteSpace(input.SourceCode))
- {
- var sourceId = await _db.Ado.GetLongAsync(
- """
- SELECT id FROM mdp_source
- WHERE source_code = @SourceCode AND (tenant_id = @TenantId OR tenant_id = 0)
- ORDER BY tenant_id DESC
- LIMIT 1
- """,
- new List<SugarParameter>
- {
- new("@SourceCode", input.SourceCode.Trim()),
- new("@TenantId", tenantId)
- });
- if (sourceId > 0)
- return sourceId;
- throw Oops.Oh($"数据源 {input.SourceCode} 不存在");
- }
- if (fallbackSourceId is > 0)
- return fallbackSourceId.Value;
- throw Oops.Oh("请指定 sourceId 或 sourceCode");
- }
- private async Task BumpTaskConfigVersionAsync(MdpSyncTask task)
- {
- task.ConfigVersion += 1;
- task.UpdateTime = DateTime.Now;
- await _db.Updateable(task).UpdateColumns(u => new { u.ConfigVersion, u.UpdateTime }).ExecuteCommandAsync();
- }
- private static string? ResolveEntityCodePrefix(string taskCode)
- {
- if (string.IsNullOrWhiteSpace(taskCode))
- return null;
- var first = taskCode.Split('_', StringSplitOptions.RemoveEmptyEntries).FirstOrDefault();
- if (string.IsNullOrWhiteSpace(first) || first.Length < 2 || first[0] != 'S' || !char.IsDigit(first[1]))
- return null;
- return first + "_";
- }
- private static bool IsBuiltInEntity(MdpEntity entity) =>
- entity.EntityCode.StartsWith("S1_", StringComparison.OrdinalIgnoreCase)
- || entity.EntityCode.StartsWith("S2_", StringComparison.OrdinalIgnoreCase)
- || entity.EntityCode.StartsWith("S3_", StringComparison.OrdinalIgnoreCase)
- || entity.EntityCode.StartsWith("S4_", StringComparison.OrdinalIgnoreCase);
- private static void ValidateEntityUpsert(MdpEntityUpsertInput input)
- {
- if (string.IsNullOrWhiteSpace(input.EntityCode))
- throw Oops.Oh("实体编码不能为空");
- if (string.IsNullOrWhiteSpace(input.EntityName))
- throw Oops.Oh("实体名称不能为空");
- }
- private static void ValidateFieldMappingUpsert(MdpFieldMappingUpsertInput input)
- {
- if (string.IsNullOrWhiteSpace(input.SourceField))
- throw Oops.Oh("源字段不能为空");
- if (string.IsNullOrWhiteSpace(input.TargetField))
- throw Oops.Oh("目标字段不能为空");
- }
- private static string NormalizeFieldType(string? fieldType)
- {
- var normalized = string.IsNullOrWhiteSpace(fieldType) ? "DIRECT" : fieldType.Trim().ToUpperInvariant();
- return ValidFieldTypes.Contains(normalized) ? normalized : "DIRECT";
- }
- private static MdpFieldMapping MapFieldMappingInsert(long entityId, MdpFieldMappingUpsertInput input) =>
- new()
- {
- EntityId = entityId,
- SourceField = input.SourceField.Trim(),
- TargetField = input.TargetField.Trim(),
- FieldType = NormalizeFieldType(input.FieldType),
- TransformScript = input.TransformScript?.Trim(),
- ConstValue = input.ConstValue?.Trim(),
- LookupTable = input.LookupTable?.Trim(),
- IsRequired = input.IsRequired ? 1 : 0,
- DefaultValue = input.DefaultValue?.Trim(),
- SortOrder = input.SortOrder,
- CreateTime = DateTime.Now
- };
- private static MdpFieldMappingRow MapFieldMappingRow(MdpFieldMapping row) =>
- new()
- {
- Id = row.Id,
- EntityId = row.EntityId,
- SourceField = row.SourceField,
- TargetField = row.TargetField,
- FieldType = row.FieldType,
- TransformScript = row.TransformScript,
- ConstValue = row.ConstValue,
- LookupTable = row.LookupTable,
- IsRequired = row.IsRequired != 0,
- DefaultValue = row.DefaultValue,
- SortOrder = row.SortOrder
- };
- }
|