|
|
@@ -0,0 +1,858 @@
|
|
|
+namespace Admin.NET.Plugin.AiDOP.Order;
|
|
|
+
|
|
|
+/// <summary>
|
|
|
+/// S1 首批 MDP 同步和标准化转换服务。
|
|
|
+/// </summary>
|
|
|
+public class S1MdpSyncTransformService : ITransient
|
|
|
+{
|
|
|
+ private const string JobCode = "S1_MDP_SYNC_TRANSFORM";
|
|
|
+ private readonly ISqlSugarClient _db;
|
|
|
+
|
|
|
+ public S1MdpSyncTransformService(ISqlSugarClient db)
|
|
|
+ {
|
|
|
+ _db = db;
|
|
|
+ }
|
|
|
+
|
|
|
+ public async Task<S1MdpSyncTransformResult> RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
|
|
|
+ {
|
|
|
+ cancellationToken.ThrowIfCancellationRequested();
|
|
|
+ var now = DateTime.Now;
|
|
|
+ var batchId = $"S1_MDP_FULL_{now:yyyyMMddHHmmss}";
|
|
|
+ var runLogId = await InsertTransformRunLogAsync(batchId, now, triggerType);
|
|
|
+ var result = new S1MdpSyncTransformResult { BatchId = batchId, RunLogId = runLogId };
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ result.StageRows = await SyncStagingAsync(batchId, now, cancellationToken);
|
|
|
+ result.StandardRows = await TransformStandardAsync(batchId, now, cancellationToken);
|
|
|
+ result.DwdRows = await BuildDwdAsync(batchId, now, cancellationToken);
|
|
|
+ result.KpiRows = await BuildS1KpiValuesAsync(now, cancellationToken);
|
|
|
+ await MarkTransformRunSuccessAsync(runLogId, now, result);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ await MarkTransformRunFailedAsync(runLogId, now, ex.Message);
|
|
|
+ throw;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<int> SyncStagingAsync(string batchId, DateTime now, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var total = 0;
|
|
|
+ foreach (var entity in S1MdpEntityConfig.All)
|
|
|
+ {
|
|
|
+ cancellationToken.ThrowIfCancellationRequested();
|
|
|
+ total += await SyncOneEntityAsync(entity, batchId, now);
|
|
|
+ }
|
|
|
+ return total;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<int> SyncOneEntityAsync(S1MdpEntityConfig entity, string batchId, DateTime now)
|
|
|
+ {
|
|
|
+ var entityRow = await _db.Ado.SqlQuerySingleAsync<S1MdpEntityRow>(
|
|
|
+ "SELECT id AS Id, entity_name AS EntityName FROM mdp_entity WHERE tenant_id=0 AND entity_code=@EntityCode LIMIT 1",
|
|
|
+ new SugarParameter("@EntityCode", entity.EntityCode));
|
|
|
+ if (entityRow == null) throw Oops.Oh($"未找到 MDP 实体配置:{entity.EntityCode}");
|
|
|
+
|
|
|
+ var columns = await _db.Ado.SqlQueryAsync<S1ColumnRow>(
|
|
|
+ """
|
|
|
+ SELECT COLUMN_NAME AS ColumnName
|
|
|
+ FROM information_schema.COLUMNS
|
|
|
+ WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=@TableName
|
|
|
+ ORDER BY ORDINAL_POSITION
|
|
|
+ """,
|
|
|
+ new SugarParameter("@TableName", entity.SourceTable));
|
|
|
+ if (columns.Count == 0) throw Oops.Oh($"未找到源表:{entity.SourceTable}");
|
|
|
+
|
|
|
+ var names = columns.Select(u => u.ColumnName).ToList();
|
|
|
+ var tenantExpr = BuildOptionalColumnExpr(names, "tenant_id", "0");
|
|
|
+ var factoryExpr = BuildOptionalColumnExpr(names, "factory_id", "NULL");
|
|
|
+ var companyExpr = BuildOptionalColumnExpr(names, "company_id", "NULL");
|
|
|
+ var sourceRowExpr = names.Any(u => string.Equals(u, entity.SourceRowIdExpression, StringComparison.OrdinalIgnoreCase))
|
|
|
+ ? $"s.`{FindColumn(names, entity.SourceRowIdExpression)}`"
|
|
|
+ : entity.SourceRowIdExpression;
|
|
|
+ var rawDataExpr = BuildJsonObjectExpression(names);
|
|
|
+
|
|
|
+ var rowsRead = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM `{entity.SourceTable}`");
|
|
|
+ var logId = await InsertSyncLogAsync(entityRow.Id, entityRow.EntityName, batchId, rowsRead);
|
|
|
+ var started = DateTime.Now;
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ var affected = await _db.Ado.ExecuteCommandAsync(
|
|
|
+ $"""
|
|
|
+ INSERT INTO `{entity.TargetTable}`
|
|
|
+ (tenant_id, factory_id, company_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
|
|
|
+ SELECT
|
|
|
+ {tenantExpr},
|
|
|
+ {factoryExpr},
|
|
|
+ {companyExpr},
|
|
|
+ 'AIDOP',
|
|
|
+ @SourceTable,
|
|
|
+ CAST({sourceRowExpr} AS CHAR),
|
|
|
+ CAST(COALESCE({entity.SourceBizKeyExpression}, CAST({sourceRowExpr} AS CHAR)) AS CHAR),
|
|
|
+ @BatchId,
|
|
|
+ @Now,
|
|
|
+ 'PENDING',
|
|
|
+ {rawDataExpr}
|
|
|
+ FROM `{entity.SourceTable}` s
|
|
|
+ ON DUPLICATE KEY UPDATE
|
|
|
+ tenant_id=VALUES(tenant_id),
|
|
|
+ factory_id=VALUES(factory_id),
|
|
|
+ company_id=VALUES(company_id),
|
|
|
+ source_row_id=VALUES(source_row_id),
|
|
|
+ sync_batch_id=VALUES(sync_batch_id),
|
|
|
+ sync_time=VALUES(sync_time),
|
|
|
+ process_status=VALUES(process_status),
|
|
|
+ raw_data=VALUES(raw_data),
|
|
|
+ update_time=CURRENT_TIMESTAMP
|
|
|
+ """,
|
|
|
+ new SugarParameter("@SourceTable", entity.SourceTable),
|
|
|
+ new SugarParameter("@BatchId", batchId),
|
|
|
+ new SugarParameter("@Now", now));
|
|
|
+
|
|
|
+ await MarkSyncLogSuccessAsync(logId, started, affected);
|
|
|
+ return rowsRead;
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ await MarkSyncLogFailedAsync(logId, started, ex.Message);
|
|
|
+ throw;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<int> TransformStandardAsync(string batchId, DateTime now, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var total = 0;
|
|
|
+ foreach (var command in BuildStandardCommands(batchId, now))
|
|
|
+ {
|
|
|
+ cancellationToken.ThrowIfCancellationRequested();
|
|
|
+ total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
|
|
|
+ }
|
|
|
+ return total;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<int> BuildDwdAsync(string batchId, DateTime now, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var total = 0;
|
|
|
+ foreach (var command in BuildDwdCommands(batchId, now))
|
|
|
+ {
|
|
|
+ cancellationToken.ThrowIfCancellationRequested();
|
|
|
+ total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
|
|
|
+ }
|
|
|
+ return total;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<int> BuildS1KpiValuesAsync(DateTime now, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var statDate = now.Date;
|
|
|
+ var rows = await CalculateS1KpiValuesAsync(statDate);
|
|
|
+ var affected = 0;
|
|
|
+ foreach (var row in rows)
|
|
|
+ {
|
|
|
+ cancellationToken.ThrowIfCancellationRequested();
|
|
|
+ affected += await UpsertS1KpiValueAsync(row, statDate, now);
|
|
|
+ }
|
|
|
+
|
|
|
+ return affected;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<List<S1KpiCalcRow>> CalculateS1KpiValuesAsync(DateTime statDate)
|
|
|
+ {
|
|
|
+ return await _db.Ado.SqlQueryAsync<S1KpiCalcRow>(
|
|
|
+ """
|
|
|
+ SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
|
|
|
+ 'S1_L1_001' AS MetricCode,
|
|
|
+ ROUND(AVG(TIMESTAMPDIFF(HOUR, order_date, COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date)) / 24), 4) AS MetricValue
|
|
|
+ FROM mdp_std_so
|
|
|
+ WHERE order_date IS NOT NULL
|
|
|
+ AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) IS NOT NULL
|
|
|
+ AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) >= order_date
|
|
|
+ GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
|
|
|
+ UNION ALL
|
|
|
+ SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
|
|
|
+ 'S1_L1_002' AS MetricCode,
|
|
|
+ ROUND(100 * SUM(CASE WHEN TIMESTAMPDIFF(HOUR, order_date, COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date)) <= 72 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
|
|
|
+ FROM mdp_std_so
|
|
|
+ WHERE order_date IS NOT NULL
|
|
|
+ AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) IS NOT NULL
|
|
|
+ GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
|
|
|
+ UNION ALL
|
|
|
+ SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
|
|
|
+ 'S1_L1_003' AS MetricCode,
|
|
|
+ ROUND(COUNT(1) / GREATEST(COUNT(DISTINCT NULLIF(planner_no, '')), 1), 4) AS MetricValue
|
|
|
+ FROM mdp_std_so
|
|
|
+ WHERE order_date IS NOT NULL AND order_date <= @StatDate
|
|
|
+ GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
|
|
|
+ UNION ALL
|
|
|
+ SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
|
|
|
+ 'S1_L2_010' AS MetricCode,
|
|
|
+ ROUND(AVG(TIMESTAMPDIFF(HOUR, order_date, COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date)) / 24), 4) AS MetricValue
|
|
|
+ FROM mdp_std_so
|
|
|
+ WHERE order_date IS NOT NULL
|
|
|
+ AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) IS NOT NULL
|
|
|
+ AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) >= order_date
|
|
|
+ GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
|
|
|
+ UNION ALL
|
|
|
+ SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
|
|
|
+ 'S1_L2_011' AS MetricCode,
|
|
|
+ ROUND(100 * SUM(CASE WHEN TIMESTAMPDIFF(HOUR, order_date, COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date)) <= 72 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
|
|
|
+ FROM mdp_std_so
|
|
|
+ WHERE order_date IS NOT NULL
|
|
|
+ AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) IS NOT NULL
|
|
|
+ GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
|
|
|
+ UNION ALL
|
|
|
+ SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
|
|
|
+ 'S1_L2_012' AS MetricCode,
|
|
|
+ ROUND(COUNT(1) / GREATEST(COUNT(DISTINCT NULLIF(planner_no, '')), 1), 4) AS MetricValue
|
|
|
+ FROM mdp_std_so
|
|
|
+ WHERE order_date IS NOT NULL AND order_date <= @StatDate
|
|
|
+ GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
|
|
|
+ """,
|
|
|
+ new SugarParameter("@StatDate", statDate));
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<int> UpsertS1KpiValueAsync(S1KpiCalcRow row, DateTime statDate, DateTime now)
|
|
|
+ {
|
|
|
+ var meta = await _db.Ado.SqlQuerySingleAsync<S1KpiMetaRow>(
|
|
|
+ """
|
|
|
+ SELECT MetricLevel, Direction, YellowThreshold, RedThreshold
|
|
|
+ FROM ado_smart_ops_kpi_master
|
|
|
+ WHERE TenantId=@TenantId AND ModuleCode='S1' AND MetricCode=@MetricCode AND IsEnabled=1
|
|
|
+ LIMIT 1
|
|
|
+ """,
|
|
|
+ new SugarParameter("@TenantId", row.TenantId),
|
|
|
+ new SugarParameter("@MetricCode", row.MetricCode));
|
|
|
+ if (meta == null || row.MetricValue == null)
|
|
|
+ return 0;
|
|
|
+
|
|
|
+ var table = ResolveKpiValueTable(meta.MetricLevel);
|
|
|
+ var current = await _db.Ado.SqlQuerySingleAsync<S1KpiValueRow>(
|
|
|
+ $"""
|
|
|
+ SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
|
|
|
+ FROM {table}
|
|
|
+ WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S1'
|
|
|
+ AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
|
|
|
+ ORDER BY id
|
|
|
+ LIMIT 1
|
|
|
+ """,
|
|
|
+ new SugarParameter("@TenantId", row.TenantId),
|
|
|
+ new SugarParameter("@FactoryId", row.FactoryId),
|
|
|
+ new SugarParameter("@MetricCode", row.MetricCode),
|
|
|
+ new SugarParameter("@BizDate", statDate));
|
|
|
+ var prior = await _db.Ado.SqlQuerySingleAsync<S1KpiValueRow>(
|
|
|
+ $"""
|
|
|
+ SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
|
|
|
+ FROM {table}
|
|
|
+ WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S1'
|
|
|
+ AND metric_code=@MetricCode AND biz_date<@BizDate AND is_deleted=0
|
|
|
+ ORDER BY biz_date DESC, id DESC
|
|
|
+ LIMIT 1
|
|
|
+ """,
|
|
|
+ new SugarParameter("@TenantId", row.TenantId),
|
|
|
+ new SugarParameter("@FactoryId", row.FactoryId),
|
|
|
+ new SugarParameter("@MetricCode", row.MetricCode),
|
|
|
+ new SugarParameter("@BizDate", statDate));
|
|
|
+
|
|
|
+ var actual = Math.Round(row.MetricValue.Value, 4);
|
|
|
+ var target = current?.TargetValue ?? prior?.TargetValue ?? DefaultS1Target(row.MetricCode);
|
|
|
+ var status = ResolveKpiStatus(actual, target, meta.Direction, meta.YellowThreshold, meta.RedThreshold);
|
|
|
+ var trend = ResolveTrendFlag(actual, prior?.MetricValue);
|
|
|
+
|
|
|
+ if (current != null)
|
|
|
+ {
|
|
|
+ return await _db.Ado.ExecuteCommandAsync(
|
|
|
+ $"""
|
|
|
+ UPDATE {table}
|
|
|
+ SET metric_value=@MetricValue, target_value=@TargetValue, status_color=@StatusColor, trend_flag=@TrendFlag,
|
|
|
+ is_active=1, status='ACTIVE', calc_time=@CalcTime, update_time=@CalcTime
|
|
|
+ WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S1'
|
|
|
+ AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
|
|
|
+ """,
|
|
|
+ new SugarParameter("@MetricValue", actual),
|
|
|
+ new SugarParameter("@TargetValue", target),
|
|
|
+ new SugarParameter("@StatusColor", status),
|
|
|
+ new SugarParameter("@TrendFlag", trend),
|
|
|
+ new SugarParameter("@CalcTime", now),
|
|
|
+ new SugarParameter("@TenantId", row.TenantId),
|
|
|
+ new SugarParameter("@FactoryId", row.FactoryId),
|
|
|
+ new SugarParameter("@MetricCode", row.MetricCode),
|
|
|
+ new SugarParameter("@BizDate", statDate));
|
|
|
+ }
|
|
|
+
|
|
|
+ var nextId = await _db.Ado.GetLongAsync($"SELECT COALESCE(MAX(id), 0) + 1 FROM {table}");
|
|
|
+ return await _db.Ado.ExecuteCommandAsync(
|
|
|
+ $"""
|
|
|
+ INSERT INTO {table}
|
|
|
+ (id, tenant_id, factory_id, status, biz_date, create_time, update_time, is_deleted, is_active,
|
|
|
+ module_code, metric_code, metric_value, target_value, status_color, trend_flag, calc_time)
|
|
|
+ VALUES
|
|
|
+ (@Id, @TenantId, @FactoryId, 'ACTIVE', @BizDate, @CalcTime, @CalcTime, 0, 1,
|
|
|
+ 'S1', @MetricCode, @MetricValue, @TargetValue, @StatusColor, @TrendFlag, @CalcTime)
|
|
|
+ """,
|
|
|
+ new SugarParameter("@Id", nextId),
|
|
|
+ new SugarParameter("@TenantId", row.TenantId),
|
|
|
+ new SugarParameter("@FactoryId", row.FactoryId),
|
|
|
+ new SugarParameter("@BizDate", statDate),
|
|
|
+ new SugarParameter("@CalcTime", now),
|
|
|
+ new SugarParameter("@MetricCode", row.MetricCode),
|
|
|
+ new SugarParameter("@MetricValue", actual),
|
|
|
+ new SugarParameter("@TargetValue", target),
|
|
|
+ new SugarParameter("@StatusColor", status),
|
|
|
+ new SugarParameter("@TrendFlag", trend));
|
|
|
+ }
|
|
|
+
|
|
|
+ private IEnumerable<S1MdpSqlCommand> BuildStandardCommands(string batchId, DateTime now)
|
|
|
+ {
|
|
|
+ yield return Cmd(
|
|
|
+ """
|
|
|
+ INSERT INTO mdp_std_so
|
|
|
+ (tenant_id, factory_id, company_id, source_system, order_id, order_entry_id, order_no, order_line, order_type,
|
|
|
+ customer_id, customer_no, customer_name, customer_order_no, country, item_code, item_name, item_spec,
|
|
|
+ map_number, map_name, bom_number, unit, order_qty, delivered_notice_qty, delivered_qty, price, tax_price,
|
|
|
+ amount, total_amount, order_date, customer_request_date, plan_delivery_date, promised_delivery_date,
|
|
|
+ capacity_date, material_ready_date, planner_no, planner_name, order_status, review_status, review_stage,
|
|
|
+ flow_state, progress, urgent, closed, deleted_flag, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time)
|
|
|
+ SELECT
|
|
|
+ COALESCE(e.tenant_id, h.tenant_id, 0),
|
|
|
+ COALESCE(e.factory_id, h.factory_id),
|
|
|
+ e.company_id,
|
|
|
+ 'AIDOP',
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.Id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.Id')) AS SIGNED) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.Id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.Id')) AS SIGNED) END,
|
|
|
+ COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.bill_no')), e.source_biz_key),
|
|
|
+ CAST(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.entry_seq')), JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.Id'))) AS CHAR),
|
|
|
+ CAST(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.order_type')) AS CHAR),
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.custom_id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.custom_id')) AS SIGNED) END,
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.custom_no')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.custom_name')),
|
|
|
+ COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.custom_order_bill_no')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.bill_from'))),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.country')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.item_number')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.item_name')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.specification')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.map_number')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.map_name')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.bom_number')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.unit')),
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.qty')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.deliver_notice_count')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.deliver_notice_count')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.deliver_count')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.deliver_count')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.price')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.price')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.tax_price')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.tax_price')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.amount')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.amount')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.total_amount')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.total_amount')) AS DECIMAL(18,6)) END,
|
|
|
+ NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.date')), 'null'), ''),
|
|
|
+ NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.rdate')), 'null'), ''),
|
|
|
+ NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.plan_date')), 'null'), ''),
|
|
|
+ NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.date')), 'null'), ''),
|
|
|
+ NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.sys_capacity_date')), 'null'), ''),
|
|
|
+ NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.sys_material_date')), 'null'), ''),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.planner_no')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.planner_name')),
|
|
|
+ CASE WHEN JSON_EXTRACT(h.raw_data,'$.closed') IN (1, true) THEN 'CLOSED' ELSE 'OPEN' END,
|
|
|
+ COALESCE(JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.FlowStatus')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.flowstate'))),
|
|
|
+ COALESCE(JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.CurrentDept')), JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.CurrentStage'))),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.flowstate')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.progress')),
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(COALESCE(e.raw_data, h.raw_data),'$.urgent')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(COALESCE(e.raw_data, h.raw_data),'$.urgent')) AS SIGNED) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.closed')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.closed')) AS SIGNED) END,
|
|
|
+ COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(COALESCE(e.raw_data, h.raw_data),'$.IsDeleted')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(COALESCE(e.raw_data, h.raw_data),'$.IsDeleted')) AS SIGNED) END, 0),
|
|
|
+ e.source_table,
|
|
|
+ e.source_row_id,
|
|
|
+ e.source_biz_key,
|
|
|
+ @BatchId,
|
|
|
+ @Now
|
|
|
+ FROM mdp_stg_so e
|
|
|
+ LEFT JOIN mdp_stg_so h ON h.source_table='crm_seorder'
|
|
|
+ AND JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.Id')) = JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.seorder_id'))
|
|
|
+ LEFT JOIN mdp_stg_so r ON r.source_table='ado_contract_review'
|
|
|
+ AND JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.BillNo')) = COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.contract_no')), JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.bill_no')))
|
|
|
+ WHERE e.source_table='crm_seorderentry'
|
|
|
+ AND IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.bill_no'))), '') <> ''
|
|
|
+ ON DUPLICATE KEY UPDATE
|
|
|
+ customer_no=VALUES(customer_no), customer_name=VALUES(customer_name), item_code=VALUES(item_code),
|
|
|
+ item_name=VALUES(item_name), item_spec=VALUES(item_spec), order_qty=VALUES(order_qty),
|
|
|
+ delivered_notice_qty=VALUES(delivered_notice_qty), delivered_qty=VALUES(delivered_qty),
|
|
|
+ plan_delivery_date=VALUES(plan_delivery_date), promised_delivery_date=VALUES(promised_delivery_date),
|
|
|
+ capacity_date=VALUES(capacity_date), material_ready_date=VALUES(material_ready_date),
|
|
|
+ order_status=VALUES(order_status), review_status=VALUES(review_status), review_stage=VALUES(review_stage),
|
|
|
+ flow_state=VALUES(flow_state), progress=VALUES(progress), deleted_flag=VALUES(deleted_flag),
|
|
|
+ sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
|
|
|
+ """, batchId, now);
|
|
|
+
|
|
|
+ yield return Cmd(
|
|
|
+ """
|
|
|
+ INSERT INTO mdp_std_ship_trans
|
|
|
+ (tenant_id, factory_id, company_id, source_system, trans_type, plan_id, plan_no, plan_line, order_id, order_entry_id,
|
|
|
+ order_no, order_line, customer_no, customer_name, country, item_code, item_name, item_spec, qty, plan_qty,
|
|
|
+ weight, volume, order_date, plan_ship_date, shipping_site, shipping_address, consignee, telephone,
|
|
|
+ status, confirm_status, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time)
|
|
|
+ SELECT
|
|
|
+ IFNULL(d.tenant_id, 0),
|
|
|
+ d.factory_id,
|
|
|
+ d.company_id,
|
|
|
+ 'AIDOP',
|
|
|
+ 'SHIP_PLAN',
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.plan_id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.plan_id')) AS SIGNED) END,
|
|
|
+ COALESCE(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.LotSerial')), CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.plan_id')) AS CHAR)),
|
|
|
+ CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RecID')) AS CHAR),
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.seorder_id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.seorder_id')) AS SIGNED) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.sentry_id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.sentry_id')) AS SIGNED) END,
|
|
|
+ COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdNbr'))),
|
|
|
+ CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.sentry_id')) AS CHAR),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.CustomNo')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.CustomName')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Country')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ItemNum')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ItemName')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Specification')),
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Qty')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Qty')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Weight')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Weight')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Volume')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Volume')) AS DECIMAL(18,6)) END,
|
|
|
+ NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdDate')), 'null'), ''),
|
|
|
+ NULLIF(NULLIF(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShippingDate')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.CreateTime'))), 'null'), ''),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShippingSite')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShippingAddress')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Consignee')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Telephone')),
|
|
|
+ COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Status')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Status'))),
|
|
|
+ CAST(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.IsConfirm')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.IsConfirm'))) AS CHAR),
|
|
|
+ d.source_table,
|
|
|
+ d.source_row_id,
|
|
|
+ d.source_biz_key,
|
|
|
+ @BatchId,
|
|
|
+ @Now
|
|
|
+ FROM mdp_stg_ship_trans d
|
|
|
+ LEFT JOIN mdp_stg_ship_trans m ON m.source_table='ShippingPlan'
|
|
|
+ AND JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.RecID')) = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.plan_id'))
|
|
|
+ WHERE d.source_table='ShippingPlanDetail'
|
|
|
+ AND IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdNbr'))), '') <> ''
|
|
|
+ ON DUPLICATE KEY UPDATE
|
|
|
+ plan_no=VALUES(plan_no), order_no=VALUES(order_no), customer_no=VALUES(customer_no), customer_name=VALUES(customer_name),
|
|
|
+ item_code=VALUES(item_code), item_name=VALUES(item_name), qty=VALUES(qty), plan_qty=VALUES(plan_qty),
|
|
|
+ plan_ship_date=VALUES(plan_ship_date), shipping_site=VALUES(shipping_site), shipping_address=VALUES(shipping_address),
|
|
|
+ status=VALUES(status), confirm_status=VALUES(confirm_status), sync_batch_id=VALUES(sync_batch_id),
|
|
|
+ sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
|
|
|
+ """, batchId, now);
|
|
|
+
|
|
|
+ yield return Cmd(
|
|
|
+ """
|
|
|
+ INSERT INTO mdp_std_ship_trans
|
|
|
+ (tenant_id, factory_id, source_system, trans_type, shipper_rec_id, shipper_no, shipper_line, order_no, order_line,
|
|
|
+ customer_no, item_code, item_name, qty_to_ship, picking_qty, real_qty, gross_weight, net_weight, volume,
|
|
|
+ plan_ship_date, actual_ship_date, site, status, confirm_status, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time)
|
|
|
+ SELECT
|
|
|
+ IFNULL(d.tenant_id, 0),
|
|
|
+ d.factory_id,
|
|
|
+ 'AIDOP',
|
|
|
+ 'ASN_SHIPPER',
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ASNBOLShipperRecID')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ASNBOLShipperRecID')) AS SIGNED) END,
|
|
|
+ COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Id')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Id'))),
|
|
|
+ CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Line')) AS CHAR),
|
|
|
+ COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdNbr')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.OrdNbr'))),
|
|
|
+ CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdLine')) AS CHAR),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.SoldTo')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ContainerItem')),
|
|
|
+ JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Descr')),
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyToShip')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyToShip')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PickingQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PickingQty')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RealQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RealQty')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.GrossWeight')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.GrossWeight')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.NetWeight')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.NetWeight')) AS DECIMAL(18,6)) END,
|
|
|
+ CASE WHEN COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Volume')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Volume'))) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Volume')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Volume'))) AS DECIMAL(18,6)) END,
|
|
|
+ NULLIF(NULLIF(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ShipDate')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShipDate'))), 'null'), ''),
|
|
|
+ NULLIF(NULLIF(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ShipDate')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShipDate'))), 'null'), ''),
|
|
|
+ COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Site')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Site'))),
|
|
|
+ COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Status')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Status'))),
|
|
|
+ CAST(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.IsConfirm')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.IsConfirm'))) AS CHAR),
|
|
|
+ d.source_table,
|
|
|
+ d.source_row_id,
|
|
|
+ d.source_biz_key,
|
|
|
+ @BatchId,
|
|
|
+ @Now
|
|
|
+ FROM mdp_stg_ship_trans d
|
|
|
+ LEFT JOIN mdp_stg_ship_trans m ON m.source_table='ASNBOLShipperMaster'
|
|
|
+ AND JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.RecID')) = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ASNBOLShipperRecID'))
|
|
|
+ WHERE d.source_table='ASNBOLShipperDetail'
|
|
|
+ AND IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Id')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Id'))), '') <> ''
|
|
|
+ ON DUPLICATE KEY UPDATE
|
|
|
+ shipper_no=VALUES(shipper_no), order_no=VALUES(order_no), order_line=VALUES(order_line),
|
|
|
+ customer_no=VALUES(customer_no), item_code=VALUES(item_code), item_name=VALUES(item_name),
|
|
|
+ qty_to_ship=VALUES(qty_to_ship), picking_qty=VALUES(picking_qty), real_qty=VALUES(real_qty),
|
|
|
+ actual_ship_date=VALUES(actual_ship_date), site=VALUES(site), status=VALUES(status),
|
|
|
+ confirm_status=VALUES(confirm_status), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
|
|
|
+ update_time=CURRENT_TIMESTAMP
|
|
|
+ """, batchId, now);
|
|
|
+ }
|
|
|
+
|
|
|
+ private IEnumerable<S1MdpSqlCommand> BuildDwdCommands(string batchId, DateTime now)
|
|
|
+ {
|
|
|
+ yield return Cmd(
|
|
|
+ """
|
|
|
+ INSERT INTO dwd_ship_trans
|
|
|
+ (tenant_id, factory_id, company_id, stat_date, order_id, order_entry_id, order_no, order_line, customer_no,
|
|
|
+ customer_name, country, item_code, item_name, item_spec, order_qty, planned_ship_qty, shipped_qty,
|
|
|
+ remaining_qty, order_date, customer_request_date, plan_delivery_date, promised_delivery_date,
|
|
|
+ plan_ship_date, actual_ship_date, review_status, order_status, delivery_status, linkage_status, risk_level,
|
|
|
+ source_system, source_table, source_row_id, source_biz_key, sync_batch_id, calc_batch_id, calc_time)
|
|
|
+ SELECT
|
|
|
+ so.tenant_id,
|
|
|
+ so.factory_id,
|
|
|
+ so.company_id,
|
|
|
+ @StatDate,
|
|
|
+ so.order_id,
|
|
|
+ so.order_entry_id,
|
|
|
+ so.order_no,
|
|
|
+ IFNULL(so.order_line, ''),
|
|
|
+ so.customer_no,
|
|
|
+ so.customer_name,
|
|
|
+ so.country,
|
|
|
+ IFNULL(so.item_code, ''),
|
|
|
+ so.item_name,
|
|
|
+ so.item_spec,
|
|
|
+ IFNULL(so.order_qty, 0),
|
|
|
+ IFNULL(p.plan_qty, 0),
|
|
|
+ IFNULL(a.real_qty, 0),
|
|
|
+ GREATEST(IFNULL(so.order_qty, 0) - IFNULL(a.real_qty, 0), 0),
|
|
|
+ so.order_date,
|
|
|
+ so.customer_request_date,
|
|
|
+ so.plan_delivery_date,
|
|
|
+ so.promised_delivery_date,
|
|
|
+ p.plan_ship_date,
|
|
|
+ a.actual_ship_date,
|
|
|
+ so.review_status,
|
|
|
+ so.order_status,
|
|
|
+ CASE
|
|
|
+ WHEN IFNULL(so.order_qty, 0) > 0 AND IFNULL(a.real_qty, 0) >= IFNULL(so.order_qty, 0) THEN 'COMPLETED'
|
|
|
+ WHEN COALESCE(p.plan_ship_date, so.promised_delivery_date, so.plan_delivery_date) < @Now THEN 'DELAYED'
|
|
|
+ WHEN IFNULL(p.plan_qty, 0) > 0 THEN 'PLANNED'
|
|
|
+ ELSE 'OPEN'
|
|
|
+ END,
|
|
|
+ l.linkage_status,
|
|
|
+ CASE
|
|
|
+ WHEN COALESCE(p.plan_ship_date, so.promised_delivery_date, so.plan_delivery_date) < @Now
|
|
|
+ AND IFNULL(a.real_qty, 0) < IFNULL(so.order_qty, 0) THEN 'HIGH'
|
|
|
+ WHEN IFNULL(a.real_qty, 0) < IFNULL(so.order_qty, 0) THEN 'MEDIUM'
|
|
|
+ ELSE 'LOW'
|
|
|
+ END,
|
|
|
+ 'AIDOP',
|
|
|
+ so.source_table,
|
|
|
+ so.source_row_id,
|
|
|
+ so.source_biz_key,
|
|
|
+ so.sync_batch_id,
|
|
|
+ @BatchId,
|
|
|
+ @Now
|
|
|
+ FROM mdp_std_so so
|
|
|
+ LEFT JOIN (
|
|
|
+ SELECT tenant_id, order_no, IFNULL(order_line, '') AS order_line, IFNULL(item_code, '') AS item_code,
|
|
|
+ SUM(IFNULL(plan_qty, IFNULL(qty, 0))) AS plan_qty,
|
|
|
+ MIN(plan_ship_date) AS plan_ship_date
|
|
|
+ FROM mdp_std_ship_trans
|
|
|
+ WHERE trans_type='SHIP_PLAN'
|
|
|
+ GROUP BY tenant_id, order_no, IFNULL(order_line, ''), IFNULL(item_code, '')
|
|
|
+ ) p ON so.tenant_id=p.tenant_id AND so.order_no=p.order_no AND IFNULL(so.order_line, '')=p.order_line AND IFNULL(so.item_code, '')=p.item_code
|
|
|
+ LEFT JOIN (
|
|
|
+ SELECT tenant_id, order_no, IFNULL(order_line, '') AS order_line, IFNULL(item_code, '') AS item_code,
|
|
|
+ SUM(IFNULL(real_qty, IFNULL(qty_to_ship, 0))) AS real_qty,
|
|
|
+ MAX(actual_ship_date) AS actual_ship_date
|
|
|
+ FROM mdp_std_ship_trans
|
|
|
+ WHERE trans_type='ASN_SHIPPER'
|
|
|
+ GROUP BY tenant_id, order_no, IFNULL(order_line, ''), IFNULL(item_code, '')
|
|
|
+ ) a ON so.tenant_id=a.tenant_id AND so.order_no=a.order_no AND IFNULL(so.order_line, '')=a.order_line AND IFNULL(so.item_code, '')=a.item_code
|
|
|
+ LEFT JOIN (
|
|
|
+ SELECT tenant_id, order_no, item_code, MAX(linkage_status) AS linkage_status
|
|
|
+ FROM mdp_std_ship_trans
|
|
|
+ WHERE IFNULL(linkage_status, '') <> ''
|
|
|
+ GROUP BY tenant_id, order_no, item_code
|
|
|
+ ) l ON so.tenant_id=l.tenant_id AND so.order_no=l.order_no AND IFNULL(so.item_code, '')=IFNULL(l.item_code, '')
|
|
|
+ WHERE IFNULL(so.order_no, '') <> ''
|
|
|
+ ON DUPLICATE KEY UPDATE
|
|
|
+ customer_no=VALUES(customer_no), customer_name=VALUES(customer_name), item_name=VALUES(item_name),
|
|
|
+ order_qty=VALUES(order_qty), planned_ship_qty=VALUES(planned_ship_qty), shipped_qty=VALUES(shipped_qty),
|
|
|
+ remaining_qty=VALUES(remaining_qty), plan_ship_date=VALUES(plan_ship_date), actual_ship_date=VALUES(actual_ship_date),
|
|
|
+ review_status=VALUES(review_status), order_status=VALUES(order_status), delivery_status=VALUES(delivery_status),
|
|
|
+ linkage_status=VALUES(linkage_status), risk_level=VALUES(risk_level), sync_batch_id=VALUES(sync_batch_id),
|
|
|
+ calc_batch_id=VALUES(calc_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
|
|
|
+ """, batchId, now);
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<long> InsertSyncLogAsync(long entityId, string entityName, string batchId, int rowsRead)
|
|
|
+ {
|
|
|
+ await _db.Ado.ExecuteCommandAsync(
|
|
|
+ """
|
|
|
+ INSERT INTO mdp_sync_log
|
|
|
+ (tenant_id, entity_id, source_code, entity_name, sync_batch_id, sync_type, trigger_type, sync_start, rows_read, status)
|
|
|
+ VALUES (0, @EntityId, 'AIDOPDEV_MYSQL', @EntityName, @BatchId, 'FULL', 'AUTO', NOW(), @RowsRead, 'RUNNING')
|
|
|
+ """,
|
|
|
+ new SugarParameter("@EntityId", entityId),
|
|
|
+ new SugarParameter("@EntityName", entityName),
|
|
|
+ new SugarParameter("@BatchId", batchId),
|
|
|
+ new SugarParameter("@RowsRead", rowsRead));
|
|
|
+ return await _db.Ado.GetLongAsync(
|
|
|
+ "SELECT id FROM mdp_sync_log WHERE sync_batch_id=@BatchId AND entity_id=@EntityId ORDER BY id DESC LIMIT 1",
|
|
|
+ new List<SugarParameter>
|
|
|
+ {
|
|
|
+ new("@BatchId", batchId),
|
|
|
+ new("@EntityId", entityId)
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task MarkSyncLogSuccessAsync(long logId, DateTime started, int affected)
|
|
|
+ {
|
|
|
+ await _db.Ado.ExecuteCommandAsync(
|
|
|
+ """
|
|
|
+ UPDATE mdp_sync_log
|
|
|
+ SET sync_end=NOW(), duration_ms=@DurationMs, rows_insert=@RowsInsert, rows_update=0, rows_skip=0, rows_error=0, status='SUCCESS'
|
|
|
+ WHERE id=@Id
|
|
|
+ """,
|
|
|
+ new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
|
|
|
+ new SugarParameter("@RowsInsert", affected),
|
|
|
+ new SugarParameter("@Id", logId));
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task MarkSyncLogFailedAsync(long logId, DateTime started, string message)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ await _db.Ado.ExecuteCommandAsync(
|
|
|
+ """
|
|
|
+ UPDATE mdp_sync_log
|
|
|
+ SET sync_end=NOW(), duration_ms=@DurationMs, rows_error=1, status='FAILED', error_msg=@ErrorMsg
|
|
|
+ WHERE id=@Id
|
|
|
+ """,
|
|
|
+ new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
|
|
|
+ new SugarParameter("@ErrorMsg", Truncate(message, 1000)),
|
|
|
+ new SugarParameter("@Id", logId));
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ // 写库自身失败兜底:避免再抛掩盖原异常;遗留 RUNNING 行可由运维手动清理
|
|
|
+ Console.Error.WriteLine($"[S1MdpSyncTransform] MarkSyncLogFailed write failed (syncLogId={logId}): {ex.Message}");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<long> InsertTransformRunLogAsync(string batchId, DateTime startedAt, string triggerType)
|
|
|
+ {
|
|
|
+ await _db.Ado.ExecuteCommandAsync(
|
|
|
+ """
|
|
|
+ INSERT INTO mdp_transform_run_log
|
|
|
+ (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
|
|
|
+ VALUES (0, @JobCode, 'S1 MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
|
|
|
+ """,
|
|
|
+ new SugarParameter("@JobCode", JobCode),
|
|
|
+ new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
|
|
|
+ new SugarParameter("@BatchId", batchId),
|
|
|
+ new SugarParameter("@StartTime", startedAt));
|
|
|
+ return await _db.Ado.GetLongAsync(
|
|
|
+ "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
|
|
|
+ new List<SugarParameter> { new("@BatchId", batchId) });
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task MarkTransformRunSuccessAsync(long runLogId, DateTime startedAt, S1MdpSyncTransformResult result)
|
|
|
+ {
|
|
|
+ var finishedAt = DateTime.Now;
|
|
|
+ await _db.Ado.ExecuteCommandAsync(
|
|
|
+ """
|
|
|
+ UPDATE mdp_transform_run_log
|
|
|
+ SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
|
|
|
+ stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=@DwdRows,
|
|
|
+ summary_json=@SummaryJson, update_time=CURRENT_TIMESTAMP
|
|
|
+ WHERE id=@Id
|
|
|
+ """,
|
|
|
+ new SugarParameter("@EndTime", finishedAt),
|
|
|
+ new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
|
|
|
+ new SugarParameter("@StageRows", result.StageRows),
|
|
|
+ new SugarParameter("@StandardRows", result.StandardRows),
|
|
|
+ new SugarParameter("@DwdRows", result.DwdRows),
|
|
|
+ new SugarParameter("@SummaryJson", BuildRunSummaryJson(result)),
|
|
|
+ new SugarParameter("@Id", runLogId));
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task MarkTransformRunFailedAsync(long runLogId, DateTime startedAt, string message)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ var finishedAt = DateTime.Now;
|
|
|
+ await _db.Ado.ExecuteCommandAsync(
|
|
|
+ """
|
|
|
+ UPDATE mdp_transform_run_log
|
|
|
+ SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
|
|
|
+ error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
|
|
|
+ WHERE id=@Id
|
|
|
+ """,
|
|
|
+ new SugarParameter("@EndTime", finishedAt),
|
|
|
+ new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
|
|
|
+ new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
|
|
|
+ new SugarParameter("@Id", runLogId));
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ // 写库自身失败兜底(典型场景:远端 MySQL 瞬断导致 MarkFailed 自身也连不上):
|
|
|
+ // 避免再抛二次异常掩盖原错;遗留 RUNNING 行可由运维手动清理。
|
|
|
+ Console.Error.WriteLine($"[S1MdpSyncTransform] MarkTransformRunFailed write failed (runLogId={runLogId}): {ex.Message}");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static S1MdpSqlCommand Cmd(string sql, string batchId, DateTime now)
|
|
|
+ {
|
|
|
+ return new S1MdpSqlCommand(sql, new[]
|
|
|
+ {
|
|
|
+ new SugarParameter("@BatchId", batchId),
|
|
|
+ new SugarParameter("@Now", now),
|
|
|
+ new SugarParameter("@StatDate", now.Date)
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private static string BuildJsonObjectExpression(IEnumerable<string> columns)
|
|
|
+ {
|
|
|
+ var parts = columns.SelectMany(c => new[] { $"'{c.Replace("'", "''")}'", $"s.`{c}`" });
|
|
|
+ return $"JSON_OBJECT({string.Join(",", parts)})";
|
|
|
+ }
|
|
|
+
|
|
|
+ private static string BuildOptionalColumnExpr(IReadOnlyCollection<string> columns, string expected, string fallback)
|
|
|
+ {
|
|
|
+ return columns.Any(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase))
|
|
|
+ ? $"s.`{FindColumn(columns, expected)}`"
|
|
|
+ : fallback;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static string FindColumn(IEnumerable<string> columns, string expected)
|
|
|
+ {
|
|
|
+ return columns.First(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static string NormalizeTriggerType(string? triggerType)
|
|
|
+ {
|
|
|
+ return string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static string BuildRunSummaryJson(S1MdpSyncTransformResult result)
|
|
|
+ {
|
|
|
+ return $$"""{"batchId":"{{result.BatchId}}","stageRows":{{result.StageRows}},"standardRows":{{result.StandardRows}},"dwdRows":{{result.DwdRows}},"kpiRows":{{result.KpiRows}}}""";
|
|
|
+ }
|
|
|
+
|
|
|
+ private static string ResolveKpiValueTable(int metricLevel)
|
|
|
+ {
|
|
|
+ return metricLevel switch
|
|
|
+ {
|
|
|
+ 1 => "ado_s9_kpi_value_l1_day",
|
|
|
+ 2 => "ado_s9_kpi_value_l2_day",
|
|
|
+ 3 => "ado_s9_kpi_value_l3_day",
|
|
|
+ 4 => "ado_s9_kpi_value_l4_day",
|
|
|
+ _ => "ado_s9_kpi_value_l2_day"
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ private static decimal DefaultS1Target(string metricCode)
|
|
|
+ {
|
|
|
+ return metricCode switch
|
|
|
+ {
|
|
|
+ "S1_L1_001" or "S1_L2_010" => 3m,
|
|
|
+ "S1_L1_002" or "S1_L2_011" => 95m,
|
|
|
+ "S1_L1_003" or "S1_L2_012" => 100m,
|
|
|
+ _ => 0m
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ private static string ResolveKpiStatus(decimal actual, decimal target, string? direction, decimal? yellowThreshold, decimal? redThreshold)
|
|
|
+ {
|
|
|
+ if (target <= 0) return "gray";
|
|
|
+
|
|
|
+ var ratio = actual / target * 100m;
|
|
|
+ if (string.Equals(direction, "lower_is_better", StringComparison.OrdinalIgnoreCase))
|
|
|
+ {
|
|
|
+ if (actual <= target) return "green";
|
|
|
+ if (ratio <= (yellowThreshold ?? 110m)) return "yellow";
|
|
|
+ return ratio >= (redThreshold ?? 120m) ? "red" : "yellow";
|
|
|
+ }
|
|
|
+
|
|
|
+ if (actual >= target) return "green";
|
|
|
+ if (ratio >= (yellowThreshold ?? 95m)) return "yellow";
|
|
|
+ return ratio <= (redThreshold ?? 80m) ? "red" : "yellow";
|
|
|
+ }
|
|
|
+
|
|
|
+ private static string ResolveTrendFlag(decimal actual, decimal? previous)
|
|
|
+ {
|
|
|
+ if (previous == null) return "flat";
|
|
|
+ if (actual > previous.Value) return "up";
|
|
|
+ if (actual < previous.Value) return "down";
|
|
|
+ return "flat";
|
|
|
+ }
|
|
|
+
|
|
|
+ private static string Truncate(string? raw, int maxLength)
|
|
|
+ {
|
|
|
+ if (string.IsNullOrEmpty(raw)) return string.Empty;
|
|
|
+ return raw.Length <= maxLength ? raw : raw[..maxLength];
|
|
|
+ }
|
|
|
+
|
|
|
+ private sealed class S1ColumnRow
|
|
|
+ {
|
|
|
+ public string ColumnName { get; set; } = string.Empty;
|
|
|
+ }
|
|
|
+
|
|
|
+ private sealed class S1MdpEntityRow
|
|
|
+ {
|
|
|
+ public long Id { get; set; }
|
|
|
+ public string EntityName { get; set; } = string.Empty;
|
|
|
+ }
|
|
|
+
|
|
|
+ private sealed class S1KpiCalcRow
|
|
|
+ {
|
|
|
+ public long TenantId { get; set; }
|
|
|
+ public long FactoryId { get; set; }
|
|
|
+ public string MetricCode { get; set; } = string.Empty;
|
|
|
+ public decimal? MetricValue { get; set; }
|
|
|
+ }
|
|
|
+
|
|
|
+ private sealed class S1KpiMetaRow
|
|
|
+ {
|
|
|
+ public int MetricLevel { get; set; }
|
|
|
+ public string Direction { get; set; } = "higher_is_better";
|
|
|
+ public decimal? YellowThreshold { get; set; }
|
|
|
+ public decimal? RedThreshold { get; set; }
|
|
|
+ }
|
|
|
+
|
|
|
+ private sealed class S1KpiValueRow
|
|
|
+ {
|
|
|
+ public long Id { get; set; }
|
|
|
+ public decimal? MetricValue { get; set; }
|
|
|
+ public decimal? TargetValue { get; set; }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+public sealed class S1MdpSyncTransformResult
|
|
|
+{
|
|
|
+ public long RunLogId { get; set; }
|
|
|
+ public string BatchId { get; set; } = string.Empty;
|
|
|
+ public int StageRows { get; set; }
|
|
|
+ public int StandardRows { get; set; }
|
|
|
+ public int DwdRows { get; set; }
|
|
|
+ public int KpiRows { get; set; }
|
|
|
+}
|
|
|
+
|
|
|
+internal sealed record S1MdpSqlCommand(string Sql, SugarParameter[] Parameters);
|
|
|
+
|
|
|
+internal sealed record S1MdpEntityConfig(
|
|
|
+ string EntityCode,
|
|
|
+ string SourceTable,
|
|
|
+ string TargetTable,
|
|
|
+ string SourceRowIdExpression,
|
|
|
+ string SourceBizKeyExpression)
|
|
|
+{
|
|
|
+ public static readonly IReadOnlyList<S1MdpEntityConfig> All = new List<S1MdpEntityConfig>
|
|
|
+ {
|
|
|
+ new("S1_SEORDER", "crm_seorder", "mdp_stg_so", "Id", "COALESCE(s.`bill_no`, CAST(s.`Id` AS CHAR))"),
|
|
|
+ new("S1_SEORDER_ENTRY", "crm_seorderentry", "mdp_stg_so", "Id", "CONCAT(IFNULL(s.`bill_no`,''), ':', IFNULL(s.`entry_seq`, CAST(s.`Id` AS CHAR)))"),
|
|
|
+ new("S1_SEORDER_CHANGE", "crm_seorder_change", "mdp_stg_so", "Id", "CONCAT(IFNULL(s.`bill_no`,''), ':', CAST(s.`Id` AS CHAR))"),
|
|
|
+ new("S1_CONTRACT_REVIEW", "ado_contract_review", "mdp_stg_so", "RecID", "COALESCE(s.`BillNo`, CAST(s.`RecID` AS CHAR))"),
|
|
|
+ new("S1_CONTRACT_REVIEW_FLOW", "ado_contract_review_flow", "mdp_stg_so", "RecID", "CONCAT(IFNULL(s.`ReviewBillNo`,''), ':', IFNULL(s.`StageNo`,''), ':', CAST(s.`RecID` AS CHAR))"),
|
|
|
+ new("S1_SHIPPING_PLAN", "ShippingPlan", "mdp_stg_ship_trans", "RecID", "COALESCE(s.`LotSerial`, CAST(s.`RecID` AS CHAR))"),
|
|
|
+ new("S1_SHIPPING_PLAN_DETAIL", "ShippingPlanDetail", "mdp_stg_ship_trans", "RecID", "CONCAT(IFNULL(s.`plan_id`,''), ':', IFNULL(s.`OrdNbr`,''), ':', CAST(s.`RecID` AS CHAR))"),
|
|
|
+ new("S1_ASN_SHIPPER_MASTER", "ASNBOLShipperMaster", "mdp_stg_ship_trans", "RecID", "COALESCE(s.`Id`, CONCAT(IFNULL(s.`OrdNbr`,''), ':', CAST(s.`RecID` AS CHAR)))"),
|
|
|
+ new("S1_ASN_SHIPPER_DETAIL", "ASNBOLShipperDetail", "mdp_stg_ship_trans", "RecID", "CONCAT(IFNULL(s.`Id`,''), ':', IFNULL(s.`Line`, CAST(s.`RecID` AS CHAR)))"),
|
|
|
+ new("S1_LINKAGE_PLAN", "LinkagePlan", "mdp_stg_ship_trans", "id", "CONCAT(IFNULL(s.`bill_no`,''), ':', IFNULL(s.`item_number`,''), ':', CAST(s.`id` AS CHAR))")
|
|
|
+ };
|
|
|
+}
|