MdpSyncEntityConfigService.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. using Admin.NET.Plugin.AiDOP.Dto.DataPlatform;
  2. using Admin.NET.Plugin.AiDOP.Entity.DataPlatform;
  3. namespace Admin.NET.Plugin.AiDOP.DataPlatform;
  4. /// <summary>
  5. /// 同步任务实体与字段映射配置 API。
  6. /// </summary>
  7. [ApiDescriptionSettings(Order = 324, Description = "数据中台同步实体与字段映射")]
  8. [Route("api/DataPlatform")]
  9. [AllowAnonymous]
  10. [NonUnify]
  11. public class MdpSyncEntityConfigService : IDynamicApiController, ITransient
  12. {
  13. private static readonly HashSet<string> ValidFieldTypes = new(StringComparer.OrdinalIgnoreCase)
  14. {
  15. "DIRECT", "JSONPATH", "SCRIPT", "CONST", "LOOKUP"
  16. };
  17. private readonly ISqlSugarClient _db;
  18. private readonly UserManager _userManager;
  19. public MdpSyncEntityConfigService(ISqlSugarClient db, UserManager userManager)
  20. {
  21. _db = db;
  22. _userManager = userManager;
  23. }
  24. [DisplayName("同步任务实体列表")]
  25. [HttpGet("sync-tasks/{taskCode}/entities")]
  26. public async Task<object> GetEntities(string taskCode)
  27. {
  28. var tenantId = _userManager.TenantId;
  29. var task = await FindTaskAsync(taskCode, tenantId);
  30. var prefix = ResolveEntityCodePrefix(task.TaskCode);
  31. var jobCode = string.IsNullOrWhiteSpace(task.JobCode) ? task.TaskCode : task.JobCode;
  32. var where = new List<string> { "(e.tenant_id = @TenantId OR e.tenant_id = 0)", "e.status = 1" };
  33. var pars = new List<SugarParameter> { new("@TenantId", tenantId) };
  34. if (!string.IsNullOrWhiteSpace(prefix))
  35. {
  36. where.Add("(e.entity_code LIKE @EntityPrefix OR e.job_id = @JobCode)");
  37. pars.Add(new SugarParameter("@EntityPrefix", prefix + "%"));
  38. pars.Add(new SugarParameter("@JobCode", jobCode));
  39. }
  40. else
  41. {
  42. where.Add("e.job_id = @JobCode");
  43. pars.Add(new SugarParameter("@JobCode", jobCode));
  44. }
  45. var whereSql = string.Join(" AND ", where);
  46. var list = await _db.Ado.SqlQueryAsync<MdpEntityRow>(
  47. $"""
  48. SELECT e.id AS Id, e.tenant_id AS TenantId, e.source_id AS SourceId,
  49. s.source_code AS SourceCode, s.source_name AS SourceName, s.source_type AS SourceType,
  50. e.entity_code AS EntityCode, e.entity_name AS EntityName, e.entity_type AS EntityType,
  51. e.source_table_name AS SourceTableName, e.source_api_path AS SourceApiPath,
  52. e.target_table_name AS TargetTableName, e.sync_mode AS SyncMode, e.incr_column AS IncrColumn,
  53. e.batch_size AS BatchSize, e.job_id AS JobId, e.status AS Status, e.remark AS Remark,
  54. IFNULL(m.cnt, 0) AS FieldMappingCount
  55. FROM mdp_entity e
  56. LEFT JOIN mdp_source s ON s.id = e.source_id
  57. LEFT JOIN (
  58. SELECT entity_id, COUNT(1) AS cnt FROM mdp_field_mapping GROUP BY entity_id
  59. ) m ON m.entity_id = e.id
  60. WHERE {whereSql}
  61. ORDER BY e.entity_code
  62. """,
  63. pars);
  64. return list;
  65. }
  66. [DisplayName("新增同步任务实体")]
  67. [HttpPost("sync-tasks/{taskCode}/entities")]
  68. public async Task<object> CreateEntity(string taskCode, [FromBody] MdpEntityUpsertInput input)
  69. {
  70. ValidateEntityUpsert(input);
  71. var tenantId = _userManager.TenantId;
  72. var task = await FindTaskAsync(taskCode, tenantId);
  73. var sourceId = await ResolveSourceIdAsync(input, tenantId);
  74. var entityTenantId = task.TenantId == 0 ? 0L : tenantId;
  75. var jobCode = string.IsNullOrWhiteSpace(task.JobCode) ? task.TaskCode : task.JobCode;
  76. var exists = await _db.Queryable<MdpEntity>()
  77. .Where(u => u.TenantId == entityTenantId && u.EntityCode == input.EntityCode.Trim())
  78. .AnyAsync();
  79. if (exists)
  80. throw Oops.Oh("实体编码已存在");
  81. var now = DateTime.Now;
  82. var entity = new MdpEntity
  83. {
  84. TenantId = entityTenantId,
  85. SourceId = sourceId,
  86. EntityCode = input.EntityCode.Trim(),
  87. EntityName = input.EntityName.Trim(),
  88. EntityType = string.IsNullOrWhiteSpace(input.EntityType) ? "TABLE" : input.EntityType.Trim(),
  89. SourceTableName = input.SourceTableName?.Trim(),
  90. SourceApiPath = input.SourceApiPath?.Trim(),
  91. TargetTableName = input.TargetTableName?.Trim(),
  92. SyncMode = string.IsNullOrWhiteSpace(input.SyncMode) ? "INCR" : input.SyncMode.Trim(),
  93. IncrColumn = input.IncrColumn?.Trim(),
  94. BatchSize = input.BatchSize <= 0 ? 5000 : input.BatchSize,
  95. JobId = string.IsNullOrWhiteSpace(input.JobId) ? jobCode : input.JobId.Trim(),
  96. Status = input.Status <= 0 ? 0 : 1,
  97. Remark = input.Remark?.Trim(),
  98. CreateTime = now,
  99. UpdateTime = now
  100. };
  101. var id = await _db.Insertable(entity).ExecuteReturnBigIdentityAsync();
  102. await BumpTaskConfigVersionAsync(task);
  103. return new { id };
  104. }
  105. [DisplayName("更新同步实体")]
  106. [HttpPut("entities/{id:long}")]
  107. public async Task<object> UpdateEntity(long id, [FromBody] MdpEntityUpsertInput input)
  108. {
  109. ValidateEntityUpsert(input);
  110. var tenantId = _userManager.TenantId;
  111. var entity = await FindEditableEntityAsync(id, tenantId);
  112. entity.EntityName = input.EntityName.Trim();
  113. entity.EntityType = string.IsNullOrWhiteSpace(input.EntityType) ? "TABLE" : input.EntityType.Trim();
  114. entity.SourceId = await ResolveSourceIdAsync(input, tenantId, entity.SourceId);
  115. entity.SourceTableName = input.SourceTableName?.Trim();
  116. entity.SourceApiPath = input.SourceApiPath?.Trim();
  117. entity.TargetTableName = input.TargetTableName?.Trim();
  118. entity.SyncMode = string.IsNullOrWhiteSpace(input.SyncMode) ? "INCR" : input.SyncMode.Trim();
  119. entity.IncrColumn = input.IncrColumn?.Trim();
  120. entity.BatchSize = input.BatchSize <= 0 ? 5000 : input.BatchSize;
  121. entity.JobId = input.JobId?.Trim();
  122. entity.Status = input.Status <= 0 ? 0 : 1;
  123. entity.Remark = input.Remark?.Trim();
  124. entity.UpdateTime = DateTime.Now;
  125. if (!IsBuiltInEntity(entity))
  126. entity.EntityCode = input.EntityCode.Trim();
  127. await _db.Updateable(entity).ExecuteCommandAsync();
  128. return new { id = entity.Id };
  129. }
  130. [DisplayName("删除同步实体")]
  131. [HttpDelete("entities/{id:long}")]
  132. public async Task<object> DeleteEntity(long id)
  133. {
  134. var tenantId = _userManager.TenantId;
  135. var entity = await FindEditableEntityAsync(id, tenantId);
  136. if (IsBuiltInEntity(entity) && entity.TenantId == 0)
  137. throw Oops.Oh("内置 MDP 实体不允许删除");
  138. entity.Status = 0;
  139. entity.UpdateTime = DateTime.Now;
  140. await _db.Updateable(entity).UpdateColumns(u => new { u.Status, u.UpdateTime }).ExecuteCommandAsync();
  141. return new { id = entity.Id };
  142. }
  143. [DisplayName("实体字段映射列表")]
  144. [HttpGet("entities/{entityId:long}/field-mappings")]
  145. public async Task<object> GetFieldMappings(long entityId)
  146. {
  147. var tenantId = _userManager.TenantId;
  148. await FindEditableEntityAsync(entityId, tenantId);
  149. var list = await _db.Queryable<MdpFieldMapping>()
  150. .Where(u => u.EntityId == entityId)
  151. .OrderBy(u => u.SortOrder)
  152. .OrderBy(u => u.TargetField)
  153. .ToListAsync();
  154. return list.Select(MapFieldMappingRow).ToList();
  155. }
  156. [DisplayName("新增实体字段映射")]
  157. [HttpPost("entities/{entityId:long}/field-mappings")]
  158. public async Task<object> CreateFieldMapping(long entityId, [FromBody] MdpFieldMappingUpsertInput input)
  159. {
  160. ValidateFieldMappingUpsert(input);
  161. var tenantId = _userManager.TenantId;
  162. await FindEditableEntityAsync(entityId, tenantId);
  163. var exists = await _db.Queryable<MdpFieldMapping>()
  164. .Where(u => u.EntityId == entityId && u.TargetField == input.TargetField.Trim())
  165. .AnyAsync();
  166. if (exists)
  167. throw Oops.Oh("目标字段映射已存在");
  168. var entity = MapFieldMappingInsert(entityId, input);
  169. var id = await _db.Insertable(entity).ExecuteReturnBigIdentityAsync();
  170. return new { id };
  171. }
  172. [DisplayName("更新字段映射")]
  173. [HttpPut("field-mappings/{id:long}")]
  174. public async Task<object> UpdateFieldMapping(long id, [FromBody] MdpFieldMappingUpsertInput input)
  175. {
  176. ValidateFieldMappingUpsert(input);
  177. var tenantId = _userManager.TenantId;
  178. var mapping = await _db.Queryable<MdpFieldMapping>().Where(u => u.Id == id).FirstAsync();
  179. if (mapping == null)
  180. throw Oops.Oh("字段映射不存在");
  181. await FindEditableEntityAsync(mapping.EntityId, tenantId);
  182. var duplicate = await _db.Queryable<MdpFieldMapping>()
  183. .Where(u => u.EntityId == mapping.EntityId && u.TargetField == input.TargetField.Trim() && u.Id != id)
  184. .AnyAsync();
  185. if (duplicate)
  186. throw Oops.Oh("目标字段映射已存在");
  187. mapping.SourceField = input.SourceField.Trim();
  188. mapping.TargetField = input.TargetField.Trim();
  189. mapping.FieldType = NormalizeFieldType(input.FieldType);
  190. mapping.TransformScript = input.TransformScript?.Trim();
  191. mapping.ConstValue = input.ConstValue?.Trim();
  192. mapping.LookupTable = input.LookupTable?.Trim();
  193. mapping.IsRequired = input.IsRequired ? 1 : 0;
  194. mapping.DefaultValue = input.DefaultValue?.Trim();
  195. mapping.SortOrder = input.SortOrder;
  196. await _db.Updateable(mapping).ExecuteCommandAsync();
  197. return new { id = mapping.Id };
  198. }
  199. [DisplayName("删除字段映射")]
  200. [HttpDelete("field-mappings/{id:long}")]
  201. public async Task<object> DeleteFieldMapping(long id)
  202. {
  203. var tenantId = _userManager.TenantId;
  204. var mapping = await _db.Queryable<MdpFieldMapping>().Where(u => u.Id == id).FirstAsync();
  205. if (mapping == null)
  206. throw Oops.Oh("字段映射不存在");
  207. await FindEditableEntityAsync(mapping.EntityId, tenantId);
  208. await _db.Deleteable<MdpFieldMapping>().Where(u => u.Id == id).ExecuteCommandAsync();
  209. return new { id };
  210. }
  211. private async Task<MdpSyncTask> FindTaskAsync(string taskCode, long tenantId)
  212. {
  213. if (string.IsNullOrWhiteSpace(taskCode))
  214. throw Oops.Oh("任务编码不能为空");
  215. var task = await _db.Queryable<MdpSyncTask>()
  216. .Where(u => u.TaskCode == taskCode && (u.TenantId == tenantId || u.TenantId == 0) && u.Status == 1)
  217. .OrderBy(u => u.TenantId, OrderByType.Desc)
  218. .FirstAsync();
  219. if (task == null)
  220. throw Oops.Oh("同步任务不存在");
  221. return task;
  222. }
  223. private async Task<MdpEntity> FindEditableEntityAsync(long id, long tenantId)
  224. {
  225. var entity = await _db.Queryable<MdpEntity>()
  226. .Where(u => u.Id == id && (u.TenantId == tenantId || u.TenantId == 0))
  227. .FirstAsync();
  228. if (entity == null)
  229. throw Oops.Oh("同步实体不存在");
  230. return entity;
  231. }
  232. private async Task<long> ResolveSourceIdAsync(MdpEntityUpsertInput input, long tenantId, long? fallbackSourceId = null)
  233. {
  234. if (input.SourceId > 0)
  235. return input.SourceId;
  236. if (!string.IsNullOrWhiteSpace(input.SourceCode))
  237. {
  238. var sourceId = await _db.Ado.GetLongAsync(
  239. """
  240. SELECT id FROM mdp_source
  241. WHERE source_code = @SourceCode AND (tenant_id = @TenantId OR tenant_id = 0)
  242. ORDER BY tenant_id DESC
  243. LIMIT 1
  244. """,
  245. new List<SugarParameter>
  246. {
  247. new("@SourceCode", input.SourceCode.Trim()),
  248. new("@TenantId", tenantId)
  249. });
  250. if (sourceId > 0)
  251. return sourceId;
  252. throw Oops.Oh($"数据源 {input.SourceCode} 不存在");
  253. }
  254. if (fallbackSourceId is > 0)
  255. return fallbackSourceId.Value;
  256. throw Oops.Oh("请指定 sourceId 或 sourceCode");
  257. }
  258. private async Task BumpTaskConfigVersionAsync(MdpSyncTask task)
  259. {
  260. task.ConfigVersion += 1;
  261. task.UpdateTime = DateTime.Now;
  262. await _db.Updateable(task).UpdateColumns(u => new { u.ConfigVersion, u.UpdateTime }).ExecuteCommandAsync();
  263. }
  264. private static string? ResolveEntityCodePrefix(string taskCode)
  265. {
  266. if (string.IsNullOrWhiteSpace(taskCode))
  267. return null;
  268. var first = taskCode.Split('_', StringSplitOptions.RemoveEmptyEntries).FirstOrDefault();
  269. if (string.IsNullOrWhiteSpace(first) || first.Length < 2 || first[0] != 'S' || !char.IsDigit(first[1]))
  270. return null;
  271. return first + "_";
  272. }
  273. private static bool IsBuiltInEntity(MdpEntity entity) =>
  274. entity.EntityCode.StartsWith("S1_", StringComparison.OrdinalIgnoreCase)
  275. || entity.EntityCode.StartsWith("S2_", StringComparison.OrdinalIgnoreCase)
  276. || entity.EntityCode.StartsWith("S3_", StringComparison.OrdinalIgnoreCase)
  277. || entity.EntityCode.StartsWith("S4_", StringComparison.OrdinalIgnoreCase);
  278. private static void ValidateEntityUpsert(MdpEntityUpsertInput input)
  279. {
  280. if (string.IsNullOrWhiteSpace(input.EntityCode))
  281. throw Oops.Oh("实体编码不能为空");
  282. if (string.IsNullOrWhiteSpace(input.EntityName))
  283. throw Oops.Oh("实体名称不能为空");
  284. }
  285. private static void ValidateFieldMappingUpsert(MdpFieldMappingUpsertInput input)
  286. {
  287. if (string.IsNullOrWhiteSpace(input.SourceField))
  288. throw Oops.Oh("源字段不能为空");
  289. if (string.IsNullOrWhiteSpace(input.TargetField))
  290. throw Oops.Oh("目标字段不能为空");
  291. }
  292. private static string NormalizeFieldType(string? fieldType)
  293. {
  294. var normalized = string.IsNullOrWhiteSpace(fieldType) ? "DIRECT" : fieldType.Trim().ToUpperInvariant();
  295. return ValidFieldTypes.Contains(normalized) ? normalized : "DIRECT";
  296. }
  297. private static MdpFieldMapping MapFieldMappingInsert(long entityId, MdpFieldMappingUpsertInput input) =>
  298. new()
  299. {
  300. EntityId = entityId,
  301. SourceField = input.SourceField.Trim(),
  302. TargetField = input.TargetField.Trim(),
  303. FieldType = NormalizeFieldType(input.FieldType),
  304. TransformScript = input.TransformScript?.Trim(),
  305. ConstValue = input.ConstValue?.Trim(),
  306. LookupTable = input.LookupTable?.Trim(),
  307. IsRequired = input.IsRequired ? 1 : 0,
  308. DefaultValue = input.DefaultValue?.Trim(),
  309. SortOrder = input.SortOrder,
  310. CreateTime = DateTime.Now
  311. };
  312. private static MdpFieldMappingRow MapFieldMappingRow(MdpFieldMapping row) =>
  313. new()
  314. {
  315. Id = row.Id,
  316. EntityId = row.EntityId,
  317. SourceField = row.SourceField,
  318. TargetField = row.TargetField,
  319. FieldType = row.FieldType,
  320. TransformScript = row.TransformScript,
  321. ConstValue = row.ConstValue,
  322. LookupTable = row.LookupTable,
  323. IsRequired = row.IsRequired != 0,
  324. DefaultValue = row.DefaultValue,
  325. SortOrder = row.SortOrder
  326. };
  327. }