影刀RPA数据清洗实战:从脏数据到可用数据的转化之路
·
拼多多店群自动化上架方案
影刀RPA数据清洗实战:从脏数据到可用数据的转化之路
作者:林焱
采集回来的数据很少能直接用——空值、重复、格式混乱、编码错误、异常值……如果不做清洗就直接分析或入库,得出的结论毫无价值。数据清洗通常占数据分析工作量的60%-80%,而影刀RPA配合Python可以高效自动化这个过程。这篇文章系统讲解数据清洗的各种场景和解决方案。

一、脏数据的七大类型
| 类型 | 示例 | 影响 |
|---|---|---|
| 缺失值 | 价格为空、电话为None | 统计偏差、计算报错 |
| 重复数据 | 同一商品出现多次 | 统计虚高 |
| 格式混乱 | “¥5999” “5,999” “5999.00” | 无法计算 |
| 编码错误 | 乱码、“锟斤拷” | 文本不可读 |
| 异常值 | 价格=0或价格=999999 | 分析失真 |
| 不一致 | “北京”/“北京市”/“BJ” | 聚合错误 |
| 结构错误 | 多值在一个字段 “红,蓝,绿” | 无法筛选 |
二、缺失值处理
2.1 识别缺失值

def 识别缺失值(data):
"""统计数据中的缺失值"""
missing_report = {}
for key in data[0].keys():
missing_count = sum(1 for item in data if not item.get(key) or item[key] in ["", None, "null", "NULL", "N/A", "-"])
missing_pct = missing_count / len(data) * 100
missing_report[key] = {
"缺失数": missing_count,
"缺失率": f"{missing_pct:.1f}%"
}
return missing_report
# 示例
data = [
{"name": "iPhone", "price": 5999, "stock": 100},
{"name": "", "price": None, "stock": 0},
{"name": "华为", "price": "N/A", "stock": None},
]
report = 识别缺失值(data)
for field, info in report.items():
print(f"{field}:缺失{info['缺失数']}条({info['缺失率']})")
2.2 缺失值填充策略
def 填充缺失值(data, strategy):
"""根据策略填充缺失值"""
for item in data:
for key, fill_strategy in strategy.items():
value = item.get(key)
# 判断是否缺失
if value is None or value in ["", "null", "NULL", "N/A", "-"]:
if fill_strategy == "zero":
item[key] = 0
elif fill_strategy == "empty_string":
item[key] = ""
elif fill_strategy == "mean":
# 用该列的平均值填充
values = [item[key] for item in data if isinstance(item.get(key), (int, float))]
item[key] = sum(values) / len(values) if values else 0
elif fill_strategy == "mode":
# 用众数填充
from collections import Counter
values = [item[key] for item in data if item.get(key) not in [None, "", "N/A"]]
if values:
item[key] = Counter(values).most_common(1)[0][0]
elif fill_strategy == "forward_fill":
# 用前一个有效值填充
pass # 需要在循环中处理
elif isinstance(fill_strategy, str) and fill_strategy.startswith("default:"):
# 用固定默认值
item[key] = fill_strategy.replace("default:", "")
elif callable(fill_strategy):
# 用函数计算
item[key] = fill_strategy(item)
return data
# 策略配置
strategy = {
"price": "mean", # 价格用平均值
"stock": "zero", # 库存用0
"category": "default:未分类", # 分类用"未分类"
"name": "empty_string", # 名称用空字符串
}
cleaned_data = 填充缺失值(data, strategy)
三、重复数据处理

3.1 完全去重
def 完全去重(data):
"""基于所有字段的完全去重"""
seen = set()
unique = []
for item in data:
# 将字典转为可哈希的元组
key = tuple(sorted(item.items()))
if key not in seen:
seen.add(key)
unique.append(item)
print(f"去重:{len(data)} → {len(unique)}(删除{len(data) - len(unique)}条)")
return unique
3.2 基于关键字段去重
def 关键字段去重(data, keys, keep="first"):
"""基于指定字段去重"""
seen = {}
for item in data:
key = tuple(item.get(k, "") for k in keys)
if key not in seen:
seen[key] = item
elif keep == "last":
seen[key] = item
elif keep == "max_price":
# 保留价格更高的
if item.get("price", 0) > seen[key].get("price", 0):
seen[key] = item
result = list(seen.values())
print(f"去重(字段:{keys}):{len(data)} → {len(result)}")
return result
# 使用:基于商品名称+链接去重,保留价格更高的
cleaned = 关键字段去重(data, keys=["name", "url"], keep="max_price")
3.3 模糊去重

def 模糊去重(data, key_field, threshold=0.8):
"""基于文本相似度的模糊去重"""
from difflib import SequenceMatcher
def similarity(a, b):
return SequenceMatcher(None, str(a), str(b)).ratio()
unique = []
removed = 0
for item in data:
text = item.get(key_field, "")
is_duplicate = False
for existing in unique:
if similarity(text, existing.get(key_field, "")) >= threshold:
is_duplicate = True
removed += 1
break
if not is_duplicate:
unique.append(item)
print(f"模糊去重:{len(data)} → {len(unique)}(删除{removed}条相似数据)")
return unique
四、格式标准化
4.1 价格格式统一
import re
def 标准化价格(price_str):
"""将各种价格格式统一为浮点数"""
if not price_str or price_str in ["N/A", "-", "暂无报价"]:
return 0.0
# 转为字符串处理
price_str = str(price_str)
# 去掉货币符号
price_str = re.sub(r'[¥¥$€]', '', price_str)
# 去掉千分位逗号
price_str = price_str.replace(',', '')
# 去掉空格
price_str = price_str.strip()

# 提取数字部分
match = re.search(r'[\d.]+', price_str)
if match:
try:
return float(match.group())
except ValueError:
return 0.0
return 0.0
# 测试
test_prices = ["¥5,999.00", "5999", "$99.9", "价格:3999元", "暂无报价", "¥12,345.67", None]
for p in test_prices:
print(f"{p} → {标准化价格(p)}")
4.2 日期格式统一
from datetime import datetime
def 标准化日期(date_str, output_format="%Y-%m-%d"):
"""将各种日期格式统一"""
if not date_str or date_str in ["N/A", "-", ""]:
return ""
date_str = str(date_str).strip()
# 常见日期格式列表
formats = [
"%Y-%m-%d", # 2024-03-15
"%Y/%m/%d", # 2024/03/15
"%Y年%m月%d日", # 2024年03月15日
"%d/%m/%Y", # 15/03/2024
"%m-%d-%Y", # 03-15-2024
"%Y%m%d", # 20240315
"%b %d, %Y", # Mar 15, 2024
"%d %b %Y", # 15 Mar 2024
]
for fmt in formats:
try:
dt = datetime.strptime(date_str, fmt)
return dt.strftime(output_format)
except ValueError:
continue
# 尝试用正则提取
match = re.search(r'(\d{4})[年/\-.](\d{1,2})[月/\-.](\d{1,2})', date_str)
if match:
try:
dt = datetime(int(match.group(1)), int(match.group(2)), int(match.group(3)))
return dt.strftime(output_format)
except:
pass
return date_str # 无法识别则原样返回
4.3 手机号标准化
def 标准化手机号(phone_str):
"""标准化手机号"""
if not phone_str:
return ""
# 只保留数字
digits = re.sub(r'\D', '', str(phone_str))
# 去掉国际区号
if digits.startswith("86") and len(digits) == 13:
digits = digits[2:]
# 验证是否为有效手机号
if re.match(r'^1[3-9]\d{9}$', digits):
return digits
return phone_str # 无效号码原样返回
4.4 地址标准化
def 标准化地址(address):
"""标准化地址(统一省份名称)"""
if not address:
return ""
# 省份名称映射
province_map = {
"北京": "北京市", "上海": "上海市", "天津": "天津市", "重庆": "重庆市",
"广东": "广东省", "浙江": "浙江省", "江苏": "江苏省",
"BJ": "北京市", "SH": "上海市", "GZ": "广东省",
}

# 替换简称
for short, full in province_map.items():
if address.startswith(short) and not address.startswith(full):
address = address.replace(short, full, 1)
break
return address
五、异常值检测
5.1 统计方法检测
TEMU店群如何管理运营?
def 检测异常值(data, field, method="iqr"):
"""检测数据中的异常值"""
values = [item.get(field, 0) for item in data if isinstance(item.get(field), (int, float))]
if not values:
return []
outliers = []
if method == "iqr":
# IQR方法
values_sorted = sorted(values)
n = len(values_sorted)
q1 = values_sorted[n // 4]
q3 = values_sorted[3 * n // 4]
iqr = q3 - q1
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
outliers = [item for item in data if isinstance(item.get(field), (int, float)) and (item[field] < lower or item[field] > upper)]
elif method == "zscore":
# Z-Score方法
import statistics
mean = statistics.mean(values)
std = statistics.stdev(values) if len(values) > 1 else 0
if std > 0:
outliers = [item for item in data if isinstance(item.get(field), (int, float)) and abs((item[field] - mean) / std) > 3]
elif method == "range":
# 范围法
outliers = [item for item in data if isinstance(item.get(field), (int, float)) and (item[field] <= 0 or item[field] > 1000000)]
return outliers
# 使用:检测价格异常值
price_outliers = 检测异常值(data, "price", method="iqr")
print(f"价格异常值:{len(price_outliers)}条")
5.2 异常值处理

def 处理异常值(data, field, method="clip"):
"""处理异常值"""
values = [item.get(field, 0) for item in data if isinstance(item.get(field), (int, float))]
if not values:
return data
# 计算边界
values_sorted = sorted(values)
n = len(values_sorted)
q1 = values_sorted[n // 4]
q3 = values_sorted[3 * n // 4]
iqr = q3 - q1
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
for item in data:
value = item.get(field)
if not isinstance(value, (int, float)):
continue
if method == "clip":
# 截断:将异常值截断到边界
item[field] = max(lower, min(upper, value))
elif method == "remove":
# 删除:标记为无效
if value < lower or value > upper:
item[field] = None
elif method == "replace_median":
# 替换为中位数
if value < lower or value > upper:
median = values_sorted[n // 2]
item[field] = median
return data
六、完整数据清洗流水线
class DataCleaner:
"""数据清洗流水线"""
def __init__(self, config=None):
self.config = config or {}
self.stats = {"input": 0, "output": 0, "removed": 0, "fixed": 0}
def clean(self, data):
"""执行完整清洗流水线"""
self.stats["input"] = len(data)
# 1. 去除完全空行
data = self._remove_empty_rows(data)
# 2. 缺失值填充
data = self._fill_missing(data)
# 3. 格式标准化
data = self._standardize_formats(data)
# 4. 去重
data = self._deduplicate(data)
# 5. 异常值处理
data = self._handle_outliers(data)
# 6. 最终校验
data = self._final_validation(data)
self.stats["output"] = len(data)
self.stats["removed"] = self.stats["input"] - self.stats["output"]
self._print_report()
return data
def _remove_empty_rows(self, data):
"""移除所有字段都为空的行"""
return [item for item in data if any(v for v in item.values() if v not in [None, "", "N/A"])]
def _fill_missing(self, data):
"""填充缺失值"""
strategy = self.config.get("fill_strategy", {})
return 填充缺失值(data, strategy)
def _standardize_formats(self, data):
"""格式标准化"""
for item in data:
if "price" in item:
item["price"] = 标准化价格(item["price"])
if "date" in item:
item["date"] = 标准化日期(item["date"])
if "phone" in item:
item["phone"] = 标准化手机号(item["phone"])
if "address" in item:
item["address"] = 标准化地址(item["address"])
self.stats["fixed"] += 1
return data
def _deduplicate(self, data):
"""去重"""
keys = self.config.get("dedup_keys", ["name"])
return 关键字段去重(data, keys=keys)
def _handle_outliers(self, data):
"""异常值处理"""
for field in self.config.get("outlier_fields", []):
data = 处理异常值(data, field, method="clip")
return data
def _final_validation(self, data):
"""最终校验"""
valid = []
for item in data:
# 检查必填字段
required = self.config.get("required_fields", [])
if all(item.get(f) for f in required):
valid.append(item)
return valid
def _print_report(self):
"""打印清洗报告"""
print(f"""数据清洗报告:
输入:{self.stats['input']}条
输出:{self.stats['output']}条
删除:{self.stats['removed']}条
修正:{self.stats['fixed']}处
有效率:{self.stats['output']/self.stats['input']*100:.1f}%""")
# 使用
config = {
"fill_strategy": {"price": "mean", "category": "default:未分类"},
"dedup_keys": ["name", "url"],
"outlier_fields": ["price"],
"required_fields": ["name", "price"],
}
cleaner = DataCleaner(config)
clean_data = cleaner.clean(raw_data)

总结
数据清洗的核心步骤与工具:
| 步骤 | 目的 | 关键方法 |
|---|---|---|
| 缺失值处理 | 补全数据 | 均值/众数/默认值填充 |
| 去重 | 消除重复 | 完全去重/关键字段去重/模糊去重 |
| 格式标准化 | 统一格式 | 正则提取/映射表/解析器 |
| 异常值处理 | 消除噪声 | IQR/Z-Score/截断/替换 |
| 校验 | 确保质量 | 必填检查/范围检查/一致性检查 |
数据清洗的原则:宁可多删不错留,宁可保守不冒险。不确定的数据先标记,不要擅自修改或删除,保留原始数据供回溯。

作者:林焱 | 觉得有用就收藏,后续分享更多影刀RPA实战技巧
更多推荐



所有评论(0)