Files
CleanFlow/main.py
2025-10-07 23:18:45 +08:00

531 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# coding: utf-8
from flask import Flask, request, jsonify, send_from_directory
from flask_cors import CORS
import pandas as pd
import json
import re
import os
from datetime import datetime
from pathlib import Path
from functools import wraps
app = Flask(__name__, static_folder='static', static_url_path='')
CORS(app)
# 配置
BASE_DIR = Path(__file__).resolve().parent
SUSPECTS_FOLDER = BASE_DIR / "suspects"
UPLOAD_FOLDER = BASE_DIR / "uploads"
SUSPECTS_FOLDER.mkdir(exist_ok=True)
UPLOAD_FOLDER.mkdir(exist_ok=True)
# 最大文件大小限制 (16MB)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
# ----------------- 错误处理装饰器 -----------------
def handle_errors(f):
"""统一错误处理装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
try:
return f(*args, **kwargs)
except FileNotFoundError:
return jsonify({'error': '文件不存在'}), 404
except ValueError as e:
return jsonify({'error': f'数据格式错误: {str(e)}'}), 400
except Exception as e:
app.logger.error(f'Error in {f.__name__}: {str(e)}')
return jsonify({'error': f'服务器错误: {str(e)}'}), 500
return decorated_function
# ----------------- Helpers -----------------
def safe_filename(name: str) -> str:
"""安全化文件名:去除路径,确保 .json 后缀"""
if not name:
return ''
# 只保留基础文件名
name = os.path.basename(name)
# 移除危险字符
name = re.sub(r'[^\w\-_.]', '_', name)
if not name.endswith('.json'):
name = name + '.json'
return name
def read_json_file(path: Path):
"""读取 JSON 文件,带错误处理"""
if not path.exists():
return []
try:
with path.open('r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, list) else []
except json.JSONDecodeError:
app.logger.warning(f'Invalid JSON in {path}')
return []
except Exception as e:
app.logger.error(f'Error reading {path}: {str(e)}')
return []
def write_json_file(path: Path, data):
"""写入 JSON 文件,带备份"""
# 如果文件存在,先备份
if path.exists():
backup_path = path.with_suffix('.json.bak')
try:
import shutil
shutil.copy2(path, backup_path)
except Exception as e:
app.logger.warning(f'Backup failed: {str(e)}')
with path.open('w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def validate_record(record: dict) -> tuple[bool, str]:
"""验证记录数据的完整性"""
required_fields = ['name', 'phone']
for field in required_fields:
if not record.get(field):
return False, f'缺少必填字段: {field}'
# 验证手机号格式(简单验证)
phone = record.get('phone', '')
if phone and not re.match(r'^[\d\-\*\s]+$', phone):
return False, '手机号格式不正确'
return True, ''
def process_order_ids(order_id_str):
"""处理订单号字符串返回完整订单号列表和后4位列表"""
if not order_id_str:
return [], []
full_order_ids = []
suffixes = []
for oid in str(order_id_str).split(','):
oid_clean = oid.strip()
if not oid_clean:
continue
full_order_ids.append(oid_clean)
if len(oid_clean) >= 4:
suffixes.append(oid_clean[-4:])
return full_order_ids, suffixes
# ---------- Excel -> orders ----------
def process_excel_to_json(file_path):
"""
转换上传的 Excel 为订单字典列表
映射中文表头到内部字段
"""
try:
df = pd.read_excel(file_path)
except Exception as e:
raise ValueError(f'Excel 文件读取失败: {str(e)}')
if df.empty:
raise ValueError('Excel 文件为空')
rename_map = {
"收件人": "name",
"收件电话": "phone",
"收件省": "",
"收件市": "",
"收件区": "",
"店铺": "store",
"平台订单号": "order_id",
"商品名称": "product",
"商品名": "product",
"规格名称": "sku",
"实收金额": "price"
}
df = df.rename(columns={c: rename_map[c] for c in df.columns if c in rename_map})
# 构建地址
if {"", "", ""}.intersection(df.columns):
df["address"] = df[["", "", ""]].fillna("").agg(" ".join, axis=1).str.strip()
else:
if "address" not in df.columns:
df["address"] = ""
# 删除缺少必要字段的行
df = df.dropna(subset=[c for c in ["name", "phone"] if c in df.columns], how='any')
# 过滤补差价商品
if "product" in df.columns:
df = df[~df["product"].astype(str).str.contains("补收差价|补差价专用", na=False)]
# 选择相关列
cols = ["name", "phone", "address", "store", "order_id", "product", "sku", "price"]
existing_cols = [c for c in cols if c in df.columns]
df = df[existing_cols]
return df.to_dict(orient="records")
# ---------- 匹配辅助函数 ----------
def get_suffix(order_id):
"""获取订单号后4位"""
if isinstance(order_id, str) and len(order_id) >= 4:
return order_id[-4:]
return None
def clean_masked(text):
"""清理带星号的文本"""
if not isinstance(text, str):
return ""
return text.replace("*", "").strip()
def get_city(address):
"""从地址中提取城市名"""
if not isinstance(address, str):
return ""
match = re.search(r"([\u4e00-\u9fa5]+市)", address)
return match.group(1) if match else ""
def name_match(a, b):
"""姓名匹配(首字匹配)"""
a_clean, b_clean = clean_masked(a), clean_masked(b)
return bool(a_clean and b_clean and a_clean[0] == b_clean[0])
def phone_last_digit(phone):
"""获取手机号最后一位数字"""
if not isinstance(phone, str):
return ""
digits = re.findall(r"\d", phone)
return digits[-1] if digits else ""
def phone_match(a, b):
"""手机号匹配(尾号匹配)"""
last_a = phone_last_digit(a)
last_b = phone_last_digit(b)
return bool(last_a and last_b and last_a == last_b)
def calc_match_score(order, suspect):
"""
计算匹配分数
返回: (score, percentage, same_address_bool, same_city_bool)
"""
score = 0
same_address = False
same_city = False
max_score = 13
addr_a = (order.get("address") or "").strip()
addr_b = (suspect.get("address") or "").strip()
# 地址匹配检查
if addr_a and addr_b and addr_a == addr_b:
score += 6
same_address = True
else:
if get_city(addr_a) and get_city(addr_a) == get_city(addr_b):
score += 3
same_city = True
# 处理嫌疑人订单号 - 现在使用 suffixes 字段
suspect_suffixes = suspect.get("suffixes", [])
order_suffix = get_suffix(order.get("order_id", ""))
if same_address:
if order_suffix and order_suffix in suspect_suffixes:
score += 4
if name_match(order.get("name"), suspect.get("name")):
score += 2
if phone_match(order.get("phone"), suspect.get("phone")):
score += 1
elif same_city:
if name_match(order.get("name"), suspect.get("name")):
score += 1
if phone_match(order.get("phone"), suspect.get("phone")):
score += 1
if order_suffix and order_suffix in suspect_suffixes:
score += 2
percentage = int((score / max_score) * 100) if score > 0 else 0
return score, percentage, same_address, same_city
def match_orders_with_suspects(orders, suspects):
"""
匹配订单与嫌疑人
返回两种分组结构:完整地址匹配和城市级匹配
"""
full_grouped = {}
city_grouped = {}
for order in orders:
for suspect in suspects:
score, percentage, addr_match, city_match = calc_match_score(order, suspect)
if score <= 0:
continue
order_info = {
"orderId": order.get("order_id", ""),
"orderName": order.get("name", ""),
"orderPhone": order.get("phone", ""),
"score": score,
"percentage": percentage
}
if addr_match:
addr = order.get("address", "")
name = suspect.get("name", "") or "未知"
addr_group = full_grouped.setdefault(addr, {})
cust_group = addr_group.setdefault(name, {
"name": suspect.get("name", ""),
"phone": suspect.get("phone", ""),
"address": suspect.get("address", ""),
"store": suspect.get("store", ""),
"registertime": suspect.get("registertime", ""),
"order_id": suspect.get("full_order_ids", []), # 返回完整订单号
"note": suspect.get("note", ""),
"info_url": suspect.get("info_url", ""),
"matched_orders": []
})
cust_group["matched_orders"].append(order_info)
elif city_match:
city = get_city(order.get("address", ""))
if not city:
continue
name = suspect.get("name", "") or "未知"
city_group = city_grouped.setdefault(city, {})
cust_group = city_group.setdefault(name, {
"name": suspect.get("name", ""),
"phone": suspect.get("phone", ""),
"address": suspect.get("address", ""),
"store": suspect.get("store", ""),
"registertime": suspect.get("registertime", ""),
"order_id": suspect.get("full_order_ids", []), # 返回完整订单号
"note": suspect.get("note", ""),
"info_url": suspect.get("info_url", ""),
"matched_orders": []
})
cust_group["matched_orders"].append(order_info)
full_matches = [{"address": addr, "customers": list(customers.values())} for addr, customers in full_grouped.items()]
city_matches = [{"city": city, "customers": list(customers.values())} for city, customers in city_grouped.items()]
return full_matches, city_matches
# ----------------- Routes -----------------
@app.route('/')
def index():
return send_from_directory('static', 'index.html')
@app.route('/<path:path>')
def static_files(path):
return send_from_directory('static', path)
@app.route('/api/suspects', methods=['GET'])
@handle_errors
def list_suspects():
"""列出所有嫌疑人库文件"""
files = sorted([f for f in os.listdir(SUSPECTS_FOLDER) if f.endswith('.json')])
return jsonify({'files': files})
@app.route('/api/suspects/<path:filename>', methods=['GET', 'POST', 'PUT', 'DELETE'])
@handle_errors
def handle_suspect_file(filename):
"""处理嫌疑人库文件的 CRUD 操作"""
safe_name = safe_filename(filename)
file_path = SUSPECTS_FOLDER / safe_name
if request.method == 'GET':
data = read_json_file(file_path)
return jsonify({'data': data})
if request.method == 'POST':
if file_path.exists():
return jsonify({'error': '文件已存在'}), 400
payload = request.get_json(silent=True) or {}
initial_data = payload.get('data', []) if isinstance(payload, dict) else []
write_json_file(file_path, initial_data)
return jsonify({'message': '创建成功', 'filename': safe_name})
if request.method == 'PUT':
payload = request.get_json(silent=True)
if payload is None:
return jsonify({'error': '无有效 JSON 数据'}), 400
data_to_write = payload.get('data', []) if isinstance(payload, dict) else payload
write_json_file(file_path, data_to_write)
return jsonify({'message': '更新成功'})
if request.method == 'DELETE':
if not file_path.exists():
return jsonify({'error': '文件不存在'}), 404
file_path.unlink()
return jsonify({'message': '删除成功'})
return jsonify({'error': '不支持的方法'}), 405
@app.route('/api/suspects/<path:filename>/records', methods=['POST'])
@handle_errors
def add_suspect_record(filename):
"""添加嫌疑人记录"""
safe_name = safe_filename(filename)
file_path = SUSPECTS_FOLDER / safe_name
if not file_path.exists():
return jsonify({'error': '文件不存在'}), 404
payload = request.get_json(silent=True)
if not payload:
return jsonify({'error': '缺少请求体'}), 400
# 验证数据
is_valid, error_msg = validate_record(payload)
if not is_valid:
return jsonify({'error': error_msg}), 400
data = read_json_file(file_path)
new_record = payload
# 添加注册时间
if 'registertime' not in new_record or not new_record.get('registertime'):
new_record['registertime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 处理订单号存储完整订单号和后4位
if 'order_id' in new_record and new_record.get('order_id'):
full_order_ids, suffixes = process_order_ids(new_record['order_id'])
new_record['full_order_ids'] = full_order_ids
new_record['suffixes'] = suffixes
# 保持向后兼容order_id 字段存储完整订单号(逗号分隔)
new_record['order_id'] = ','.join(full_order_ids)
else:
new_record['full_order_ids'] = []
new_record['suffixes'] = []
new_record['order_id'] = ''
data.append(new_record)
write_json_file(file_path, data)
return jsonify({'message': '添加成功', 'data': new_record})
@app.route('/api/suspects/<path:filename>/records/<int:index>', methods=['PUT'])
@handle_errors
def update_suspect_record(filename, index):
"""更新嫌疑人记录"""
safe_name = safe_filename(filename)
file_path = SUSPECTS_FOLDER / safe_name
if not file_path.exists():
return jsonify({'error': '文件不存在'}), 404
payload = request.get_json(silent=True)
if payload is None:
return jsonify({'error': '无有效 JSON 数据'}), 400
# 验证数据
is_valid, error_msg = validate_record(payload)
if not is_valid:
return jsonify({'error': error_msg}), 400
data = read_json_file(file_path)
if index < 0 or index >= len(data):
return jsonify({'error': '索引超出范围'}), 400
updated_record = payload
# 保留原注册时间
updated_record['registertime'] = data[index].get('registertime', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 处理订单号存储完整订单号和后4位
if 'order_id' in updated_record and updated_record.get('order_id'):
full_order_ids, suffixes = process_order_ids(updated_record['order_id'])
updated_record['full_order_ids'] = full_order_ids
updated_record['suffixes'] = suffixes
# 保持向后兼容order_id 字段存储完整订单号(逗号分隔)
updated_record['order_id'] = ','.join(full_order_ids)
else:
updated_record['full_order_ids'] = []
updated_record['suffixes'] = []
updated_record['order_id'] = ''
data[index] = updated_record
write_json_file(file_path, data)
return jsonify({'message': '更新成功'})
@app.route('/api/suspects/<path:filename>/records/<int:index>', methods=['DELETE'])
@handle_errors
def delete_suspect_record(filename, index):
"""删除嫌疑人记录"""
safe_name = safe_filename(filename)
file_path = SUSPECTS_FOLDER / safe_name
if not file_path.exists():
return jsonify({'error': '文件不存在'}), 404
data = read_json_file(file_path)
if index < 0 or index >= len(data):
return jsonify({'error': '索引超出范围'}), 400
data.pop(index)
write_json_file(file_path, data)
return jsonify({'message': '删除成功'})
@app.route('/api/analyze', methods=['POST'])
@handle_errors
def analyze_orders():
"""分析订单并匹配嫌疑人"""
if 'file' not in request.files:
return jsonify({'error': '未上传文件'}), 400
if 'suspectFile' not in request.form:
return jsonify({'error': '未选择恶意客户库'}), 400
uploaded_file = request.files['file']
suspect_file = request.form['suspectFile']
if not uploaded_file.filename:
return jsonify({'error': '文件名为空'}), 400
if not uploaded_file.filename.endswith('.xlsx'):
return jsonify({'error': '仅支持 .xlsx 格式'}), 400
# 保存上传文件
save_path = UPLOAD_FOLDER / uploaded_file.filename
uploaded_file.save(str(save_path))
try:
# 处理订单
orders = process_excel_to_json(str(save_path))
# 读取嫌疑人数据
safe_name = safe_filename(suspect_file)
suspect_path = SUSPECTS_FOLDER / safe_name
if not suspect_path.exists():
return jsonify({'error': '恶意客户库文件不存在'}), 404
suspects = read_json_file(suspect_path)
# 处理现有数据兼容性:如果记录没有 full_order_ids 和 suffixes则生成它们
for suspect in suspects:
if 'full_order_ids' not in suspect or 'suffixes' not in suspect:
full_order_ids, suffixes = process_order_ids(suspect.get('order_id', ''))
suspect['full_order_ids'] = full_order_ids
suspect['suffixes'] = suffixes
# 执行匹配
full_grouped, city_grouped = match_orders_with_suspects(orders, suspects)
return jsonify({
'totalOrders': len(orders),
'fullMatches': full_grouped,
'cityMatches': city_grouped
})
finally:
# 清理临时文件
try:
if save_path.exists():
save_path.unlink()
except Exception as e:
app.logger.warning(f'Failed to delete temp file: {str(e)}')
# ----------------- 运行 -----------------
if __name__ == '__main__':
# 生产环境请使用 Gunicorn 或 uWSGI
debug_mode = os.getenv('FLASK_DEBUG', 'False').lower() == 'true'
app.run(host='0.0.0.0', port=5000, debug=debug_mode)