using System.Text.Json;
namespace Admin.NET.Plugin.AiDOP.FinishedWarehouse;
///
/// S7 成品仓储 — T8 KPI 数据底座与刷新转换服务。
/// 路径 A:方老师 v5.4 KPI 字段对照表 J 列 SQL 原逻辑直发 T8 SQL Server(ConfigId=t8_v5)。
/// 包含 KPI:S7_L1_001 订单发货周期 / S7_L1_002 订单发货满足率 / S7_L1_003 成品仓储人效。
///
public class S7MdpSyncTransformService : ITransient
{
private readonly ISqlSugarClient _db;
private const string JobCode = "S7_MDP_SYNC_TRANSFORM";
private const string JobName = "S7 成品仓储 MDP 同步与转换";
private const string T8ConfigId = "t8_v5";
private const string ModuleCode = "S7";
public S7MdpSyncTransformService(ISqlSugarClient db)
{
_db = db;
}
public async Task RunFullAsync(
CancellationToken cancellationToken = default,
string triggerType = "AUTO",
S7MdpRefreshOption? option = null)
{
cancellationToken.ThrowIfCancellationRequested();
option ??= S7MdpRefreshOption.Default();
var now = DateTime.Now;
var batchId = $"S7_MDP_FULL_{now:yyyyMMddHHmmss}";
var normalizedTrigger = NormalizeTriggerType(triggerType);
var runLogId = await InsertTransformRunLogAsync(batchId, now, normalizedTrigger);
var result = new S7MdpSyncTransformResult
{
BatchId = batchId,
RunLogId = runLogId,
TriggerType = normalizedTrigger,
SourceZtid = option.SourceZtid,
BizDate = option.BizDate,
BizMonth = option.BizMonth,
MonthlyPeriodStart = option.MonthlyPeriodStart,
MonthlyPeriodEnd = option.MonthlyPeriodEnd
};
try
{
result.StageRows = 0;
result.StandardRows = 0;
var sub25 = await BuildS7L1001OrderShipmentCycleAsync(batchId, now, option, cancellationToken);
result.MergeSub("S7_L1_001", sub25);
var sub26 = await BuildS7L1002OrderShipmentFulfillmentAsync(batchId, now, option, cancellationToken);
result.MergeSub("S7_L1_002", sub26);
var sub27 = await BuildS7L1003FinishedWarehouseEfficiencyAsync(batchId, now, option, cancellationToken);
result.MergeSub("S7_L1_003", sub27);
await MarkTransformRunSuccessAsync(runLogId, now, result);
return result;
}
catch (Exception ex)
{
await MarkTransformRunFailedAsync(runLogId, now, ex.Message);
throw;
}
}
// ─────────────────────────────────────────────────────────────────────────
/// S7_L1_001 订单发货周期 = 最晚发货日期 - 最早 FQC 报检日期(5 表 JOIN)。
private async Task BuildS7L1001OrderShipmentCycleAsync(
string batchId, DateTime now, S7MdpRefreshOption option, CancellationToken ct)
{
var sub = new KpiBuildSubResult();
const string sql = @"
select noid as noid,
datediff(day, min(shdate), max(shtime)) as scts
from (
select a.noid as noid, b.code as code,
IsNull(c.shdate, b.addtime) as shdate,
(case when b.gdyn=1 then b.gdtime else d.shtime end) as shtime,
(case when b.gdyn=1 or b.sl<=b.slzx then 1 else 0 end) as wczt
from kc_dd_head a with(nolock)
left join kc_dd_list b with(nolock) on a.Id=b.idid
left join (
select min(b.id) as id, b.lynoid as lynoid, b.code as code,
max(a.shtime) as shtime, sum(b.slzx) as slzx
from kc_tz_head a with(nolock)
inner join kc_tz_list b with(nolock) on a.Id=b.idid
where a.ztid=@ztid and a.lbs='销售出库' and a.hzyn=0 and a.zfyn=0 and a.shyn=1
group by b.lynoid, b.code
) d on b.rwnoid=d.lynoid and b.code=d.code
left join kc_zj_list c on a.ztid=c.ztid and c.lyid=b.id and c.zjyn=1
where a.ztid=@ztid and a.lbs='销售订单' and a.zf=0 and a.shyn=1
) n
group by noid
having min(wczt)=1";
var rows = await QueryT8Async(sql, new[] { new SugarParameter("@ztid", option.SourceZtid) });
sub.T8Rows = rows.Count;
var dwdAffected = 0;
var cycleList = new List();
foreach (var r in rows)
{
ct.ThrowIfCancellationRequested();
if (string.IsNullOrEmpty(r.noid)) continue;
if (r.scts.HasValue) cycleList.Add(r.scts.Value);
dwdAffected += await _db.Ado.ExecuteCommandAsync(@"
INSERT INTO dwd_t8_order_shipment_cycle
(tenant_id, factory_id, biz_date, source_ztid, order_no, cycle_days, batch_id, create_time)
VALUES
(0, 1, @bizDate, @ztid, @orderNo, @cycleDays, @batchId, @now)
ON DUPLICATE KEY UPDATE
cycle_days=VALUES(cycle_days),
batch_id=VALUES(batch_id), update_time=@now",
new SugarParameter("@bizDate", option.BizDate),
new SugarParameter("@ztid", option.SourceZtid),
new SugarParameter("@orderNo", r.noid),
new SugarParameter("@cycleDays", r.scts),
new SugarParameter("@batchId", batchId),
new SugarParameter("@now", now));
}
sub.DwdRows = dwdAffected;
decimal? metricValue = cycleList.Count > 0
? Math.Round((decimal)cycleList.Average(), 4)
: null;
sub.KpiRows = await UpsertKpiValueAsync("S7_L1_001", option.BizDate, metricValue, now);
sub.DenominatorStatus = cycleList.Count > 0 ? "OK" : "NO_COMPLETED_ORDER";
return sub;
}
/// S7_L1_002 订单发货满足率 = (交期前发货行数 / 该订单总行数) × 100%。
private async Task BuildS7L1002OrderShipmentFulfillmentAsync(
string batchId, DateTime now, S7MdpRefreshOption option, CancellationToken ct)
{
var sub = new KpiBuildSubResult();
const string sql = @"
select noid as noid,
count(noid) as total_rows,
sum(wczt) as in_window_rows
from (
select a.noid as noid, b.rwnoid as rwnoid, b.code as code,
(case when sum(d.slzx)>=b.sl then 1 else 0 end) as wczt
from kc_dd_head a with(nolock)
left join kc_dd_list b with(nolock) on a.Id=b.idid
left join (
select b.lynoid as lynoid, b.code as code,
convert(varchar(10), a.shtime, 23) as shtime, b.slzx as slzx
from kc_tz_head a with(nolock)
inner join kc_tz_list b with(nolock) on a.Id=b.idid
where a.ztid=@ztid and a.lbs='销售出库' and a.hzyn=0 and a.zfyn=0 and a.shyn=1
) d on b.rwnoid=d.lynoid and b.code=d.code and d.shtime<=b.jhdate
where a.ztid=@ztid and a.lbs='销售订单' and a.zf=0 and a.shyn=1
group by a.noid, b.rwnoid, b.code, b.sl
) n
group by noid";
var rows = await QueryT8Async(sql, new[] { new SugarParameter("@ztid", option.SourceZtid) });
sub.T8Rows = rows.Count;
var dwdAffected = 0;
var rateList = new List();
foreach (var r in rows)
{
ct.ThrowIfCancellationRequested();
if (string.IsNullOrEmpty(r.noid)) continue;
decimal? rate = (r.total_rows > 0)
? Math.Round((decimal)r.in_window_rows / r.total_rows * 100m, 4)
: null;
if (rate.HasValue) rateList.Add(rate.Value);
dwdAffected += await _db.Ado.ExecuteCommandAsync(@"
INSERT INTO dwd_t8_order_shipment_fulfillment
(tenant_id, factory_id, biz_date, source_ztid, order_no,
total_rows, in_window_rows, fulfillment_rate, batch_id, create_time)
VALUES
(0, 1, @bizDate, @ztid, @orderNo,
@total, @inWindow, @rate, @batchId, @now)
ON DUPLICATE KEY UPDATE
total_rows=VALUES(total_rows), in_window_rows=VALUES(in_window_rows),
fulfillment_rate=VALUES(fulfillment_rate),
batch_id=VALUES(batch_id), update_time=@now",
new SugarParameter("@bizDate", option.BizDate),
new SugarParameter("@ztid", option.SourceZtid),
new SugarParameter("@orderNo", r.noid),
new SugarParameter("@total", r.total_rows),
new SugarParameter("@inWindow", r.in_window_rows),
new SugarParameter("@rate", rate),
new SugarParameter("@batchId", batchId),
new SugarParameter("@now", now));
}
sub.DwdRows = dwdAffected;
decimal? metricValue = rateList.Count > 0
? Math.Round(rateList.Average(), 4)
: null;
sub.KpiRows = await UpsertKpiValueAsync("S7_L1_002", option.BizDate, metricValue, now);
sub.DenominatorStatus = rateList.Count > 0 ? "OK" : "NO_VALID_ORDER";
return sub;
}
/// S7_L1_003 成品仓储人效 = SUM(slzx where lbs=销售出库) / count(gw=仓管)。
private async Task BuildS7L1003FinishedWarehouseEfficiencyAsync(
string batchId, DateTime now, S7MdpRefreshOption option, CancellationToken ct)
{
var sub = new KpiBuildSubResult();
const string sqlNumer = @"
select b.lynoid as lynoid, b.code as code,
convert(varchar(10), a.shtime, 23) as shtime, b.slzx as slzx
from kc_tz_head a with(nolock)
inner join kc_tz_list b with(nolock) on a.Id=b.idid
where a.ztid=@ztid and a.lbs='销售出库' and a.hzyn=0 and a.zfyn=0 and a.shyn=1
and convert(varchar(10), a.shtime, 23) between @startDateText and @endDateText";
const string sqlDenom = @"
select count(*) as penum
from sys_pelist with(nolock)
where ztid=@ztid and zzzt='在职' and gw='仓管'";
var pNumer = new[]
{
new SugarParameter("@ztid", option.SourceZtid),
new SugarParameter("@startDateText", option.MonthlyPeriodStart.ToString("yyyy-MM-dd")),
new SugarParameter("@endDateText", option.MonthlyPeriodEnd.ToString("yyyy-MM-dd"))
};
var pDenom = new[] { new SugarParameter("@ztid", option.SourceZtid) };
var numerRows = await QueryT8Async(sqlNumer, pNumer);
var denomRows = await QueryT8Async(sqlDenom, pDenom);
sub.T8Rows = numerRows.Count + denomRows.Count;
decimal? shipmentQty = numerRows.Sum(r => r.slzx ?? 0m);
if (numerRows.Count == 0) shipmentQty = null;
int? headcount = denomRows.FirstOrDefault()?.penum;
decimal? efficiency = null;
string denomStatus;
if (!headcount.HasValue || headcount.Value <= 0)
denomStatus = "NO_HEADCOUNT";
else if (!shipmentQty.HasValue)
denomStatus = "NO_NUMERATOR";
else
{
efficiency = Math.Round(shipmentQty.Value / headcount.Value, 4);
denomStatus = "OK";
}
sub.DenominatorStatus = denomStatus;
var dwdAffected = await _db.Ado.ExecuteCommandAsync(@"
INSERT INTO dwd_t8_finished_warehouse_efficiency
(tenant_id, factory_id, biz_month, source_ztid, period_start, period_end,
shipment_qty, warehouse_headcount, efficiency, denominator_status, batch_id, create_time)
VALUES
(0, 1, @bizMonth, @ztid, @periodStart, @periodEnd,
@shipmentQty, @headcount, @efficiency, @denomStatus, @batchId, @now)
ON DUPLICATE KEY UPDATE
period_start=VALUES(period_start), period_end=VALUES(period_end),
shipment_qty=VALUES(shipment_qty), warehouse_headcount=VALUES(warehouse_headcount),
efficiency=VALUES(efficiency), denominator_status=VALUES(denominator_status),
batch_id=VALUES(batch_id), update_time=@now",
new SugarParameter("@bizMonth", option.BizMonth),
new SugarParameter("@ztid", option.SourceZtid),
new SugarParameter("@periodStart", option.MonthlyPeriodStart),
new SugarParameter("@periodEnd", option.MonthlyPeriodEnd),
new SugarParameter("@shipmentQty", shipmentQty),
new SugarParameter("@headcount", headcount),
new SugarParameter("@efficiency", efficiency),
new SugarParameter("@denomStatus", denomStatus),
new SugarParameter("@batchId", batchId),
new SugarParameter("@now", now));
sub.DwdRows = dwdAffected;
sub.KpiRows = await UpsertKpiValueAsync("S7_L1_003", option.MonthlyPeriodEnd, efficiency, now);
return sub;
}
// ─────────────────────────────────────────────────────────────────────────
private async Task> QueryT8Async(string sql, SugarParameter[] parameters)
{
var t8 = _db.AsTenant().GetConnectionScope(T8ConfigId);
return await t8.Ado.SqlQueryAsync(sql, parameters);
}
private async Task UpsertKpiValueAsync(string metricCode, DateTime bizDate, decimal? metricValue, DateTime now)
{
// 沿用 S3 UpsertS3KpiValueAsync 范式:先查现存行 → UPDATE;不存在 → SELECT MAX(id)+1 显式生成 id 后 INSERT。
// ado_s9_kpi_value_l1_day.id 为手工分配主键(无 AUTO_INCREMENT),必须显式 set;
// metric_value 允许 NULL(分母缺失不得伪装真实 0)。
// FIX-2:截断时分秒(月度 KPI 入参可能为 YYYY-MM-DD 23:59:59),保证 SELECT WHERE biz_date=@BizDate 与 DB date 列匹配,避免重复 INSERT。
bizDate = bizDate.Date;
var existingId = await _db.Ado.GetLongAsync(
"SELECT IFNULL((SELECT id FROM ado_s9_kpi_value_l1_day WHERE tenant_id=0 AND factory_id=1 " +
"AND module_code=@ModuleCode AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0 " +
"ORDER BY id LIMIT 1), 0)",
new List
{
new("@ModuleCode", ModuleCode),
new("@MetricCode", metricCode),
new("@BizDate", bizDate)
});
if (existingId > 0)
{
return await _db.Ado.ExecuteCommandAsync(
"UPDATE ado_s9_kpi_value_l1_day SET metric_value=@MetricValue, calc_time=@Now, " +
"update_time=@Now, is_deleted=0, is_active=1 WHERE id=@Id",
new SugarParameter("@MetricValue", metricValue),
new SugarParameter("@Now", now),
new SugarParameter("@Id", existingId));
}
var nextId = await _db.Ado.GetLongAsync(
"SELECT COALESCE(MAX(id), 0) + 1 FROM ado_s9_kpi_value_l1_day");
return await _db.Ado.ExecuteCommandAsync(@"
INSERT INTO ado_s9_kpi_value_l1_day
(id, tenant_id, org_id, company_id, factory_id, status, biz_date,
create_time, update_time, is_deleted, is_active,
module_code, metric_code, metric_value, calc_time)
VALUES
(@Id, 0, NULL, NULL, 1, NULL, @BizDate,
@Now, @Now, 0, 1,
@ModuleCode, @MetricCode, @MetricValue, @Now)",
new SugarParameter("@Id", nextId),
new SugarParameter("@BizDate", bizDate),
new SugarParameter("@Now", now),
new SugarParameter("@ModuleCode", ModuleCode),
new SugarParameter("@MetricCode", metricCode),
new SugarParameter("@MetricValue", metricValue));
}
private async Task InsertTransformRunLogAsync(string batchId, DateTime startedAt, string triggerType)
{
await _db.Ado.ExecuteCommandAsync(@"
INSERT INTO mdp_transform_run_log
(tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time, stage_rows, standard_rows, dwd_rows, create_time, update_time)
VALUES
(0, @JobCode, @JobName, @TriggerType, @BatchId, 'RUNNING', @StartTime, 0, 0, 0, @StartTime, @StartTime)",
new SugarParameter("@JobCode", JobCode),
new SugarParameter("@JobName", JobName),
new SugarParameter("@TriggerType", triggerType),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@StartTime", startedAt));
return await _db.Ado.GetLongAsync(
"SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
new List { new("@BatchId", batchId) });
}
private async Task MarkTransformRunSuccessAsync(long runLogId, DateTime startedAt, S7MdpSyncTransformResult result)
{
var finishedAt = DateTime.Now;
await _db.Ado.ExecuteCommandAsync(@"
UPDATE mdp_transform_run_log
SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=@DwdRows,
summary_json=@SummaryJson, update_time=CURRENT_TIMESTAMP
WHERE id=@Id",
new SugarParameter("@EndTime", finishedAt),
new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
new SugarParameter("@StageRows", result.StageRows),
new SugarParameter("@StandardRows", result.StandardRows),
new SugarParameter("@DwdRows", result.DwdRows),
new SugarParameter("@SummaryJson", JsonSerializer.Serialize(new
{
batchId = result.BatchId,
sourceZtid = result.SourceZtid,
bizDate = result.BizDate.ToString("yyyy-MM-dd"),
bizMonth = result.BizMonth,
dwdRows = result.DwdRows,
kpiRows = result.KpiRows,
perKpiDwdRows = result.PerKpiDwdRows,
perKpiKpiRows = result.PerKpiKpiRows,
denominatorStatus = result.KpiDenominatorStatus
})),
new SugarParameter("@Id", runLogId));
}
private async Task MarkTransformRunFailedAsync(long runLogId, DateTime startedAt, string message)
{
try
{
var finishedAt = DateTime.Now;
await _db.Ado.ExecuteCommandAsync(@"
UPDATE mdp_transform_run_log
SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
WHERE id=@Id",
new SugarParameter("@EndTime", finishedAt),
new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
new SugarParameter("@Id", runLogId));
}
catch (Exception ex)
{
Console.Error.WriteLine($"[S7MdpSyncTransform] MarkTransformRunFailed write failed (runLogId={runLogId}): {ex.Message}");
}
}
private static string NormalizeTriggerType(string s) =>
string.IsNullOrWhiteSpace(s) ? "AUTO" : s.Trim().ToUpperInvariant();
private static string Truncate(string s, int max) =>
string.IsNullOrEmpty(s) ? "" : (s.Length <= max ? s : s.Substring(0, max));
}
// DTO ────────────────────────────────────────────────────────────────────────
public sealed class S7MdpRefreshOption
{
public string SourceZtid { get; set; } = "pbxfxp";
public DateTime BizDate { get; set; }
public string BizMonth { get; set; } = "";
public DateTime MonthlyPeriodStart { get; set; }
public DateTime MonthlyPeriodEnd { get; set; }
public static S7MdpRefreshOption Default()
{
var today = DateTime.Today;
var yesterday = today.AddDays(-1);
var lastMonth = today.AddMonths(-1);
var monthStart = new DateTime(lastMonth.Year, lastMonth.Month, 1);
var monthEnd = monthStart.AddMonths(1).AddDays(-1);
return new S7MdpRefreshOption
{
SourceZtid = "pbxfxp",
BizDate = yesterday,
BizMonth = lastMonth.ToString("yyyy-MM"),
MonthlyPeriodStart = monthStart,
MonthlyPeriodEnd = monthEnd.AddDays(1).AddSeconds(-1)
};
}
}
public sealed class S7MdpSyncTransformResult
{
public string BatchId { get; set; } = "";
public long RunLogId { get; set; }
public string TriggerType { get; set; } = "AUTO";
public string SourceZtid { get; set; } = "";
public DateTime BizDate { get; set; }
public string BizMonth { get; set; } = "";
public DateTime MonthlyPeriodStart { get; set; }
public DateTime MonthlyPeriodEnd { get; set; }
public int StageRows { get; set; }
public int StandardRows { get; set; }
public int DwdRows { get; set; }
public int KpiRows { get; set; }
public Dictionary PerKpiDwdRows { get; } = new();
public Dictionary PerKpiKpiRows { get; } = new();
public List KpiDenominatorStatus { get; } = new();
public void MergeSub(string kpiCode, KpiBuildSubResult sub)
{
PerKpiDwdRows[kpiCode] = sub.DwdRows;
PerKpiKpiRows[kpiCode] = sub.KpiRows;
DwdRows += sub.DwdRows;
KpiRows += sub.KpiRows;
KpiDenominatorStatus.Add($"{kpiCode}:{sub.DenominatorStatus}");
}
}
public sealed class KpiBuildSubResult
{
public int T8Rows { get; set; }
public int DwdRows { get; set; }
public int KpiRows { get; set; }
public string DenominatorStatus { get; set; } = "OK";
}
// T8 result set 投影类型 ──────────────────────────────────────────────────────
internal sealed class S7CycleRow
{
public string? noid { get; set; }
public int? scts { get; set; }
}
internal sealed class S7FulfillmentRow
{
public string? noid { get; set; }
public int total_rows { get; set; }
public int in_window_rows { get; set; }
}
internal sealed class S7ShipmentDetailRow
{
public string? lynoid { get; set; }
public string? code { get; set; }
public string? shtime { get; set; }
public decimal? slzx { get; set; }
}
internal sealed class S7PeNumRow
{
public int penum { get; set; }
}