添加 main.py

This commit is contained in:
2025-10-07 23:18:45 +08:00
parent 3af14918b3
commit c32600730d

531
main.py Normal file
View File

@@ -0,0 +1,531 @@
#!/usr/bin/env python3
# coding: utf-8
from flask import Flask, request, jsonify, send_from_directory
from flask_cors import CORS
import pandas as pd
import json
import re
import os
from datetime import datetime
from pathlib import Path
from functools import wraps
app = Flask(__name__, static_folder='static', static_url_path='')
CORS(app)
# 配置
BASE_DIR = Path(__file__).resolve().parent
SUSPECTS_FOLDER = BASE_DIR / "suspects"
UPLOAD_FOLDER = BASE_DIR / "uploads"
SUSPECTS_FOLDER.mkdir(exist_ok=True)
UPLOAD_FOLDER.mkdir(exist_ok=True)
# 最大文件大小限制 (16MB)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
# ----------------- 错误处理装饰器 -----------------
def handle_errors(f):
"""统一错误处理装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
try:
return f(*args, **kwargs)
except FileNotFoundError:
return jsonify({'error': '文件不存在'}), 404
except ValueError as e:
return jsonify({'error': f'数据格式错误: {str(e)}'}), 400
except Exception as e:
app.logger.error(f'Error in {f.__name__}: {str(e)}')
return jsonify({'error': f'服务器错误: {str(e)}'}), 500
return decorated_function
# ----------------- Helpers -----------------
def safe_filename(name: str) -> str:
"""安全化文件名:去除路径,确保 .json 后缀"""
if not name:
return ''
# 只保留基础文件名
name = os.path.basename(name)
# 移除危险字符
name = re.sub(r'[^\w\-_.]', '_', name)
if not name.endswith('.json'):
name = name + '.json'
return name
def read_json_file(path: Path):
"""读取 JSON 文件,带错误处理"""
if not path.exists():
return []
try:
with path.open('r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, list) else []
except json.JSONDecodeError:
app.logger.warning(f'Invalid JSON in {path}')
return []
except Exception as e:
app.logger.error(f'Error reading {path}: {str(e)}')
return []
def write_json_file(path: Path, data):
"""写入 JSON 文件,带备份"""
# 如果文件存在,先备份
if path.exists():
backup_path = path.with_suffix('.json.bak')
try:
import shutil
shutil.copy2(path, backup_path)
except Exception as e:
app.logger.warning(f'Backup failed: {str(e)}')
with path.open('w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def validate_record(record: dict) -> tuple[bool, str]:
"""验证记录数据的完整性"""
required_fields = ['name', 'phone']
for field in required_fields:
if not record.get(field):
return False, f'缺少必填字段: {field}'
# 验证手机号格式(简单验证)
phone = record.get('phone', '')
if phone and not re.match(r'^[\d\-\*\s]+$', phone):
return False, '手机号格式不正确'
return True, ''
def process_order_ids(order_id_str):
"""处理订单号字符串返回完整订单号列表和后4位列表"""
if not order_id_str:
return [], []
full_order_ids = []
suffixes = []
for oid in str(order_id_str).split(','):
oid_clean = oid.strip()
if not oid_clean:
continue
full_order_ids.append(oid_clean)
if len(oid_clean) >= 4:
suffixes.append(oid_clean[-4:])
return full_order_ids, suffixes
# ---------- Excel -> orders ----------
def process_excel_to_json(file_path):
"""
转换上传的 Excel 为订单字典列表
映射中文表头到内部字段
"""
try:
df = pd.read_excel(file_path)
except Exception as e:
raise ValueError(f'Excel 文件读取失败: {str(e)}')
if df.empty:
raise ValueError('Excel 文件为空')
rename_map = {
"收件人": "name",
"收件电话": "phone",
"收件省": "",
"收件市": "",
"收件区": "",
"店铺": "store",
"平台订单号": "order_id",
"商品名称": "product",
"商品名": "product",
"规格名称": "sku",
"实收金额": "price"
}
df = df.rename(columns={c: rename_map[c] for c in df.columns if c in rename_map})
# 构建地址
if {"", "", ""}.intersection(df.columns):
df["address"] = df[["", "", ""]].fillna("").agg(" ".join, axis=1).str.strip()
else:
if "address" not in df.columns:
df["address"] = ""
# 删除缺少必要字段的行
df = df.dropna(subset=[c for c in ["name", "phone"] if c in df.columns], how='any')
# 过滤补差价商品
if "product" in df.columns:
df = df[~df["product"].astype(str).str.contains("补收差价|补差价专用", na=False)]
# 选择相关列
cols = ["name", "phone", "address", "store", "order_id", "product", "sku", "price"]
existing_cols = [c for c in cols if c in df.columns]
df = df[existing_cols]
return df.to_dict(orient="records")
# ---------- 匹配辅助函数 ----------
def get_suffix(order_id):
"""获取订单号后4位"""
if isinstance(order_id, str) and len(order_id) >= 4:
return order_id[-4:]
return None
def clean_masked(text):
"""清理带星号的文本"""
if not isinstance(text, str):
return ""
return text.replace("*", "").strip()
def get_city(address):
"""从地址中提取城市名"""
if not isinstance(address, str):
return ""
match = re.search(r"([\u4e00-\u9fa5]+市)", address)
return match.group(1) if match else ""
def name_match(a, b):
"""姓名匹配(首字匹配)"""
a_clean, b_clean = clean_masked(a), clean_masked(b)
return bool(a_clean and b_clean and a_clean[0] == b_clean[0])
def phone_last_digit(phone):
"""获取手机号最后一位数字"""
if not isinstance(phone, str):
return ""
digits = re.findall(r"\d", phone)
return digits[-1] if digits else ""
def phone_match(a, b):
"""手机号匹配(尾号匹配)"""
last_a = phone_last_digit(a)
last_b = phone_last_digit(b)
return bool(last_a and last_b and last_a == last_b)
def calc_match_score(order, suspect):
"""
计算匹配分数
返回: (score, percentage, same_address_bool, same_city_bool)
"""
score = 0
same_address = False
same_city = False
max_score = 13
addr_a = (order.get("address") or "").strip()
addr_b = (suspect.get("address") or "").strip()
# 地址匹配检查
if addr_a and addr_b and addr_a == addr_b:
score += 6
same_address = True
else:
if get_city(addr_a) and get_city(addr_a) == get_city(addr_b):
score += 3
same_city = True
# 处理嫌疑人订单号 - 现在使用 suffixes 字段
suspect_suffixes = suspect.get("suffixes", [])
order_suffix = get_suffix(order.get("order_id", ""))
if same_address:
if order_suffix and order_suffix in suspect_suffixes:
score += 4
if name_match(order.get("name"), suspect.get("name")):
score += 2
if phone_match(order.get("phone"), suspect.get("phone")):
score += 1
elif same_city:
if name_match(order.get("name"), suspect.get("name")):
score += 1
if phone_match(order.get("phone"), suspect.get("phone")):
score += 1
if order_suffix and order_suffix in suspect_suffixes:
score += 2
percentage = int((score / max_score) * 100) if score > 0 else 0
return score, percentage, same_address, same_city
def match_orders_with_suspects(orders, suspects):
"""
匹配订单与嫌疑人
返回两种分组结构:完整地址匹配和城市级匹配
"""
full_grouped = {}
city_grouped = {}
for order in orders:
for suspect in suspects:
score, percentage, addr_match, city_match = calc_match_score(order, suspect)
if score <= 0:
continue
order_info = {
"orderId": order.get("order_id", ""),
"orderName": order.get("name", ""),
"orderPhone": order.get("phone", ""),
"score": score,
"percentage": percentage
}
if addr_match:
addr = order.get("address", "")
name = suspect.get("name", "") or "未知"
addr_group = full_grouped.setdefault(addr, {})
cust_group = addr_group.setdefault(name, {
"name": suspect.get("name", ""),
"phone": suspect.get("phone", ""),
"address": suspect.get("address", ""),
"store": suspect.get("store", ""),
"registertime": suspect.get("registertime", ""),
"order_id": suspect.get("full_order_ids", []), # 返回完整订单号
"note": suspect.get("note", ""),
"info_url": suspect.get("info_url", ""),
"matched_orders": []
})
cust_group["matched_orders"].append(order_info)
elif city_match:
city = get_city(order.get("address", ""))
if not city:
continue
name = suspect.get("name", "") or "未知"
city_group = city_grouped.setdefault(city, {})
cust_group = city_group.setdefault(name, {
"name": suspect.get("name", ""),
"phone": suspect.get("phone", ""),
"address": suspect.get("address", ""),
"store": suspect.get("store", ""),
"registertime": suspect.get("registertime", ""),
"order_id": suspect.get("full_order_ids", []), # 返回完整订单号
"note": suspect.get("note", ""),
"info_url": suspect.get("info_url", ""),
"matched_orders": []
})
cust_group["matched_orders"].append(order_info)
full_matches = [{"address": addr, "customers": list(customers.values())} for addr, customers in full_grouped.items()]
city_matches = [{"city": city, "customers": list(customers.values())} for city, customers in city_grouped.items()]
return full_matches, city_matches
# ----------------- Routes -----------------
@app.route('/')
def index():
return send_from_directory('static', 'index.html')
@app.route('/<path:path>')
def static_files(path):
return send_from_directory('static', path)
@app.route('/api/suspects', methods=['GET'])
@handle_errors
def list_suspects():
"""列出所有嫌疑人库文件"""
files = sorted([f for f in os.listdir(SUSPECTS_FOLDER) if f.endswith('.json')])
return jsonify({'files': files})
@app.route('/api/suspects/<path:filename>', methods=['GET', 'POST', 'PUT', 'DELETE'])
@handle_errors
def handle_suspect_file(filename):
"""处理嫌疑人库文件的 CRUD 操作"""
safe_name = safe_filename(filename)
file_path = SUSPECTS_FOLDER / safe_name
if request.method == 'GET':
data = read_json_file(file_path)
return jsonify({'data': data})
if request.method == 'POST':
if file_path.exists():
return jsonify({'error': '文件已存在'}), 400
payload = request.get_json(silent=True) or {}
initial_data = payload.get('data', []) if isinstance(payload, dict) else []
write_json_file(file_path, initial_data)
return jsonify({'message': '创建成功', 'filename': safe_name})
if request.method == 'PUT':
payload = request.get_json(silent=True)
if payload is None:
return jsonify({'error': '无有效 JSON 数据'}), 400
data_to_write = payload.get('data', []) if isinstance(payload, dict) else payload
write_json_file(file_path, data_to_write)
return jsonify({'message': '更新成功'})
if request.method == 'DELETE':
if not file_path.exists():
return jsonify({'error': '文件不存在'}), 404
file_path.unlink()
return jsonify({'message': '删除成功'})
return jsonify({'error': '不支持的方法'}), 405
@app.route('/api/suspects/<path:filename>/records', methods=['POST'])
@handle_errors
def add_suspect_record(filename):
"""添加嫌疑人记录"""
safe_name = safe_filename(filename)
file_path = SUSPECTS_FOLDER / safe_name
if not file_path.exists():
return jsonify({'error': '文件不存在'}), 404
payload = request.get_json(silent=True)
if not payload:
return jsonify({'error': '缺少请求体'}), 400
# 验证数据
is_valid, error_msg = validate_record(payload)
if not is_valid:
return jsonify({'error': error_msg}), 400
data = read_json_file(file_path)
new_record = payload
# 添加注册时间
if 'registertime' not in new_record or not new_record.get('registertime'):
new_record['registertime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 处理订单号存储完整订单号和后4位
if 'order_id' in new_record and new_record.get('order_id'):
full_order_ids, suffixes = process_order_ids(new_record['order_id'])
new_record['full_order_ids'] = full_order_ids
new_record['suffixes'] = suffixes
# 保持向后兼容order_id 字段存储完整订单号(逗号分隔)
new_record['order_id'] = ','.join(full_order_ids)
else:
new_record['full_order_ids'] = []
new_record['suffixes'] = []
new_record['order_id'] = ''
data.append(new_record)
write_json_file(file_path, data)
return jsonify({'message': '添加成功', 'data': new_record})
@app.route('/api/suspects/<path:filename>/records/<int:index>', methods=['PUT'])
@handle_errors
def update_suspect_record(filename, index):
"""更新嫌疑人记录"""
safe_name = safe_filename(filename)
file_path = SUSPECTS_FOLDER / safe_name
if not file_path.exists():
return jsonify({'error': '文件不存在'}), 404
payload = request.get_json(silent=True)
if payload is None:
return jsonify({'error': '无有效 JSON 数据'}), 400
# 验证数据
is_valid, error_msg = validate_record(payload)
if not is_valid:
return jsonify({'error': error_msg}), 400
data = read_json_file(file_path)
if index < 0 or index >= len(data):
return jsonify({'error': '索引超出范围'}), 400
updated_record = payload
# 保留原注册时间
updated_record['registertime'] = data[index].get('registertime', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 处理订单号存储完整订单号和后4位
if 'order_id' in updated_record and updated_record.get('order_id'):
full_order_ids, suffixes = process_order_ids(updated_record['order_id'])
updated_record['full_order_ids'] = full_order_ids
updated_record['suffixes'] = suffixes
# 保持向后兼容order_id 字段存储完整订单号(逗号分隔)
updated_record['order_id'] = ','.join(full_order_ids)
else:
updated_record['full_order_ids'] = []
updated_record['suffixes'] = []
updated_record['order_id'] = ''
data[index] = updated_record
write_json_file(file_path, data)
return jsonify({'message': '更新成功'})
@app.route('/api/suspects/<path:filename>/records/<int:index>', methods=['DELETE'])
@handle_errors
def delete_suspect_record(filename, index):
"""删除嫌疑人记录"""
safe_name = safe_filename(filename)
file_path = SUSPECTS_FOLDER / safe_name
if not file_path.exists():
return jsonify({'error': '文件不存在'}), 404
data = read_json_file(file_path)
if index < 0 or index >= len(data):
return jsonify({'error': '索引超出范围'}), 400
data.pop(index)
write_json_file(file_path, data)
return jsonify({'message': '删除成功'})
@app.route('/api/analyze', methods=['POST'])
@handle_errors
def analyze_orders():
"""分析订单并匹配嫌疑人"""
if 'file' not in request.files:
return jsonify({'error': '未上传文件'}), 400
if 'suspectFile' not in request.form:
return jsonify({'error': '未选择恶意客户库'}), 400
uploaded_file = request.files['file']
suspect_file = request.form['suspectFile']
if not uploaded_file.filename:
return jsonify({'error': '文件名为空'}), 400
if not uploaded_file.filename.endswith('.xlsx'):
return jsonify({'error': '仅支持 .xlsx 格式'}), 400
# 保存上传文件
save_path = UPLOAD_FOLDER / uploaded_file.filename
uploaded_file.save(str(save_path))
try:
# 处理订单
orders = process_excel_to_json(str(save_path))
# 读取嫌疑人数据
safe_name = safe_filename(suspect_file)
suspect_path = SUSPECTS_FOLDER / safe_name
if not suspect_path.exists():
return jsonify({'error': '恶意客户库文件不存在'}), 404
suspects = read_json_file(suspect_path)
# 处理现有数据兼容性:如果记录没有 full_order_ids 和 suffixes则生成它们
for suspect in suspects:
if 'full_order_ids' not in suspect or 'suffixes' not in suspect:
full_order_ids, suffixes = process_order_ids(suspect.get('order_id', ''))
suspect['full_order_ids'] = full_order_ids
suspect['suffixes'] = suffixes
# 执行匹配
full_grouped, city_grouped = match_orders_with_suspects(orders, suspects)
return jsonify({
'totalOrders': len(orders),
'fullMatches': full_grouped,
'cityMatches': city_grouped
})
finally:
# 清理临时文件
try:
if save_path.exists():
save_path.unlink()
except Exception as e:
app.logger.warning(f'Failed to delete temp file: {str(e)}')
# ----------------- 运行 -----------------
if __name__ == '__main__':
# 生产环境请使用 Gunicorn 或 uWSGI
debug_mode = os.getenv('FLASK_DEBUG', 'False').lower() == 'true'
app.run(host='0.0.0.0', port=5000, debug=debug_mode)