本章将介绍C#多任务在数据统计分析上的应用。另外,作者从一个侧面表达一下:NLP中的计算量是相当“恐怖”的。
一、基本情况
原始语料分为两个部分:(1)语料库;(2)字典库。
其中语料数据总计10,101,283条,总计977,485,287个字符。字典不重复记录12,926,204条。
通过对语料库的统计分析,获得字符共计12692个Unicode(包括特殊字符)。
通过对语料库的统计分析,共获得48,218,815不同的“短句”。所谓短句就是完整句子中相对完整的一部分。
例如:
“你根本不知道你自己的价值,”柯林斯太太笑着说.“像你这样直率的人抵得上一百个谈吐温文尔雅的风流男子。”
那么可以从这个句子中获得三个短句:“你根本不知道你自己的价值”,“柯林斯太太笑着说”,“像你这样直率的人抵得上一百个谈吐温文尔雅的风流男子”。
这些短句的长度分布情况如下:
字符串长度(按照Unicode计算) | 总数 |
---|---|
1 | 5563 |
2 | 277200 |
3 | 1082830 |
4 | 3156576 |
5 | 3727422 |
6 | 5092530 |
7 | 5006767 |
8 | 4829769 |
9 | 4301055 |
10 | 3766264 |
11 | 3196160 |
…… | …… |
674 | 1 |
904 | 1 |
1419 | 1 |
从表中可以看出,刚开始随字符串长度增加,总数逐渐增加到最大值。随后随长度增加而逐步递减。
二、统计处理
这里所需要进行的统计分成好几个部分。先来介绍下一些最基本的分析。
(1)字符统计
这个部分的统计最为简单:从原始语料库中提取一行数据,逐个字符进行分析。
如果字符在Dictionary<string,int>对象中,可以查到,则计数器加1;否则,则增加一条新的记录。
这个部分的统计对内存和CPU的消耗并不高,很快就可以完成。
(2)拆分短句
这个部分的处理其实也不复杂:从原始语料库中提取一行数据,按照标点符号进行分段。各个分段部分的数据,可视为“短句”。或者只提取中文部分即可,如下面代码所示:
Match match = Regex.Match(item, @"[\u4E00-\u9FA5]+");
为了更加精确地分析,作者在整个过程中,增加了数量词提取过程。毕竟有不少语料中存在数量词(包括日期),这些词会干扰到短句的切分过程。
2008年9月20日本报讯……
在上面的例子中,如果只提取中文,就会跳过数字把“日”字和“本”字绑到一起,与原本的意思相违背。这种情况还真有不少。
为了能准确提取数量词,需要建立大量的正则规则模板。通过多次匹配和融合,才能较为准确地提取数量词。
private static readonly string[] QUANTITY_RULES =
{
////////////////////////////////////////////////////////////////////////////////
//
// 加载正则规则。
//
////////////////////////////////////////////////////////////////////////////////
// 正则表达式
// 编号
"0\\d*", //编号
"No.\\d*", //编号
// 整数
"-?[1-9]\\d*", //整数
"[1-9]\\d*[\\s]?-[\\s]?[1-9]\\d*", //整数
"[1-9]\\d{0,2}(,\\d{3})+", //整数
"[1-9]\\d*[\\s]?[十|百|千|万|亿]", //整数
"[1-9]\\d*[\\s]?[十|百|千|万|亿|兆]亿", //整数
// 百分数
"\\d+(\\.\\d+)?%", //百分数
"\\d+(\\.\\d+)?%-\\d+(\\.\\d+)?%", //百分数
// 分数
"[1-9]\\d*[\\s]?/[\\s]?[1-9]\\d*", //分数
"[1-9]\\d*[\\s]?/[\\s]?[1-9]\\d*-[1-9]\\d*/[1-9]\\d*", //分数
// 英文单词
"[A-Za-z][\'A-Za-z]*", //英文
// 浮点数
"-?([1-9]\\d*\\.\\d+|0\\.\\d*[1-9]\\d*|0?\\.0+)", //浮点数
"([1-9]\\d*\\.\\d+|0\\.\\d*[1-9]\\d*|0?\\.0+)[\\s]?[十|百|千|万|亿]", //浮点数
"([1-9]\\d*\\.\\d+|0\\.\\d*[1-9]\\d*|0?\\.0+)[\\s]?[十|百|千|万|亿|兆]亿", //浮点数
// 日期
"(\\d{4}|\\d{2})[\\s]?年", //日期
"(0?[1-9]|1[0-2])[\\s]?月", //日期
"((0?[1-9])|((1|2)[0-9])|30|31)[\\s]?号", //日期
"((0?[1-9])|((1|2)[0-9])|30|31)[\\s]?日", //日期
"(\\d{4}|\\d{2})[\\s]?年[\\s]?((1[0-2])|(0?[1-9]))[\\s]?月", //日期
"(0?[1-9]|1[0-2])[\\s]?月((0?[1-9])|((1|2)[0-9])|30|31)[\\s]?日", //日期
"(\\d{4}|\\d{2})-((1[0-2])|(0?[1-9]))[\\s]?-[\\s]?(([12][0-9])|(3[01])|(0?[1-9]))", //日期
"(\\d{4}|\\d{2})[\\s]?年((1[0-2])|(0?[1-9]))[\\s]?月(([12][0-9])|(3[01])|(0?[1-9]))[\\s]?日", //日期
// 时间
"((1|0?)[0-9]|2[0-3])[\\s]?时", //时间
"((1|0?)[0-9]|2[0-3])[\\s]?:[\\s]?([0-5][0-9])", //时间
"((1|0?)[0-9]|2[0-3])[\\s]?时([0-5][0-9])[\\s]?分", //时间
"((1|0?)[0-9]|2[0-3])[\\s]?点([0-5][0-9])[\\s]?分", //时间
"((1|0?)[0-9]|2[0-3])[\\s]?:[\\s]?([0-5][0-9])[\\s]?:[\\s]?([0-5][0-9])", //时间
"((1|0?)[0-9]|2[0-3])[\\s]?时[\\s]?([0-5][0-9])分([0-5][0-9])[\\s]?秒", //时间
"((0?[1-9])|((1|2)[0-9])|30|31)[\\s]?日[\\s]?((1|0?)[0-9]|2[0-3])[\\s]?时[\\s]?([0-5][0-9])[\\s]?分", //时间
// 时间段
"(\\d{4}|\\d{2})[\\s]?年[\\s]?-[\\s]?(\\d{4}|\\d{2})[\\s]?年", //时间段
"(\\d{4}|\\d{2})[\\s]?/[\\s]?(\\d{4}|\\d{2})[\\s]?财年", //时间段
"((1|0?)[0-9]|2[0-3])[\\s]?[点|时][\\s]?-[\\s]?((1|0?)[0-9]|2[0-3])[\\s]?[点|时]", //时间段
"((0?[1-9])|((1|2)[0-9])|30|31)[\\s]?[日|号][\\s]?-[\\s]?((0?[1-9])|((1|2)[0-9])|30|31)[\\s]?[日|号]", //时间段
"(\\d{4}|\\d{2})年((1[0-2])|(0?[1-9]))[\\s]?月[\\s]?-[\\s]?(\\d{4}|\\d{2})年((1[0-2])|(0?[1-9]))[\\s]?月", //时间段
"((1[0-2])|(0?[1-9]))[\\s]?月[\\s]?(([12][0-9])|(3[01])|(0?[1-9]))[\\s]?日[\\s]?-[\\s]?((1[0-2])|(0?[1-9]))[\\s]?月[\\s]?(([12][0-9])|(3[01])|(0?[1-9]))[\\s]?日", //时间段
"(\\d{4}|\\d{2})[\\s]?年[\\s]?((1[0-2])|(0?[1-9]))[\\s]?月[\\s]?(([12][0-9])|(3[01])|(0?[1-9]))[\\s]?日[\\s]?-[\\s]?(\\d{4}|\\d{2})[\\s]?年[\\s]?((1[0-2])|(0?[1-9]))[\\s]?月[\\s]?(([12][0-9])|(3[01])|(0?[1-9]))[\\s]?日", //时间段
// 特殊标识
// 身份证号码
"\\d{15}(\\d\\d[0-9xX])?", //身份证
// 国内电话号码(易混淆)
"(\\d{4}|\\d{3})?[\\s]?-[\\s]?(\\d{8}|\\d{7})", //电话
"(\\d{4}|\\d{3})?[\\s]?-[\\s]?(\\d{4}|\\d{3})?-[\\s]?(\\d{4}|\\d{3})", //电话
// HTML标记(易混淆、有死循环可能)
// "<(.*)(.*)>.*<\\/\\1>|<(.*) \\/>", //HTML标记
// EMail地址
"\\w+([-+.]\\w+)*@\\w+([-.]\\w+)*\\.\\w+([-.]\\w+)*", //电子邮箱
// IP地址
"((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)", //IP地址
// URL地址(易混淆)
"\\b(([\\w-]+://?|www[.])[^\\s()<>]+(?:\\([\\w\\d]+\\)|([^[:punct:]\\s]|/)))", //URL地址
////////////////////////////////////////////////////////////////////////////////
//
// 加载数词规则。
//
////////////////////////////////////////////////////////////////////////////////
// 数词
"$n[\\.|\\)]", //编号
"“[\\s]?$n[\\s]?”", //编号
"[0-9][\\dA-Za-z]*", //编号
"$e+$n", //编号
"$e+$n$e*", //编号
"“[\\s]?$n[\\s]?·$n[\\s]?”", //编号
"\\([\\s]?$n[\\s]?\\)|\\<[\\s]?$n[\\s]?\\>|\\{[\\s]?$n[\\s]?\\}", //编号
"$d[多|余](万|亿)", //数量
"$d[\\s]?:[\\s]?$d", //比例
"$d[\\s]?(~|至)[\\s]?$d", //区间
"(零|$c){2,}", //序号
"十$c", //十位数
"$c十$c?", //十位数
"$c百(零|$c十)?$c?", //百位数
"$c千((零|$c百)?(零|$c十)?$c?)", //千位数
"$c千((零|$c百)?(零|$c十)?$c?)万", //万位数
"$c万((零|$c千)?((零|$c百)?(零|$c十)?$c?))", //万位数
"$f个百分点", //百分点
"百分之零点(几|(零|$c)+)", //百分数
"百分之(($c?百)|((($c十)|十)?$c?))", //百分数
"百分之(($c?百)|((($c十)|十)?$c?))($c|几|左右)?", //百分数
////////////////////////////////////////////////////////////////////////////////
//
// 加载序号规则。
//
////////////////////////////////////////////////////////////////////////////////
"第$d", //序数
"第[\\s]?$d[\\s]?$q", //序数
"第$c", //序数
"第[\\s]?$c[\\s]?$q", //序数
"第十$c?", //序数
"第[\\s]?十$c?[\\s]?$q", //序数
"第$c十$c?", //序数
"第[\\s]?$c十$c?[\\s]?$q", //序数
"第$c百(零|$c十)?$c?", //序数
"第[\\s]?$c百(零|$c十)?$c?[\\s]?$q", //序数
////////////////////////////////////////////////////////////////////////////////
//
// 加载数量词规则。
//
////////////////////////////////////////////////////////////////////////////////
"$d($q|$u)", //数量
"$d[\\s]?[多|余]($q|$u)", //约数
"几(亿|万|千|百|十)[\\s]?($q|$u)", //约数
"$c[\\s]?($q|$u)", //数量
"十$c?[\\s]?($q|$u)", //数量
"$c十$c?[\\s]?($q|$u)", //数量
"$c百(零|$c十)?$c?[\\s]?($q|$u)", //数量
"$c千(零|$c百)?(零|$c十)?$c?[\\s]?($q|$u)", //数量
"(十|$c)万(零|$c千)?(零|$c百)?(零|$c十)?$c?[\\s]?($q|$u)", //数量
"$f[\\s]?($v)", //数量
////////////////////////////////////////////////////////////////////////////////
//
// 加载约数规则。
//
////////////////////////////////////////////////////////////////////////////////
"$f[\\s]?(亿|万|千|百)?(余|多)?($q|$u)", //约数
"(十|$c)[\\s]?(亿|万|千|百|十)(余|多)?($q|$u)", //约数
////////////////////////////////////////////////////////////////////////////////
//
// 加载货币规则。
//
////////////////////////////////////////////////////////////////////////////////
"$f[\\s]?(百|千|万)?(亿|万)?(余|多)?$y", //货币
"$f[\\s]?[多|余]?(百|千|万)?(亿|万)?(余|多)?$y", //货币
"[1-9]\\d{0,2}(,\\d{3})+(百|千|万)?(亿|万)?(余|多)?$y", //货币
////////////////////////////////////////////////////////////////////////////////
//
// 加载时间规则。
//
////////////////////////////////////////////////////////////////////////////////
// 时间
"[1-9]\\d*[\\s]?个月", //月
"[过]?年[前|中|底|后]", //年
"(第)?(一|二|三|四)季[度]?", //季度
"周[一|二|三|四|五|六|日]", //周
"星期[一|二|三|四|五|六|日]", //星期
"((1[0-2])|(0?[1-9]))[\\s]?月份", //月份
"([1-9]\\d*[\\s]?、)+([1-9]\\d*[\\s]?)(一|二|三|四|五|六|七|八|九|十|十一|十二)个月", //月
"(((0?[1-9])|((1|2)[0-9])|30|31)[\\s]?、)*((0?[1-9])|((1|2)[0-9])|30|31)[\\s]?日", //日期
"((0?[1-9])|((1|2)[0-9])|30|31)[\\s]?至[\\s]?((0?[1-9])|((1|2)[0-9])|30|31)[\\s]?日", //日期
"((1[0-2])|(0?[1-9]))[\\s]?月[\\s]?(((0?[1-9])|((1|2)[0-9])|30|31)[\\s]?日)(、((0?[1-9])|((1|2)[0-9])|30|31)[\\s]?日)*", //日期
////////////////////////////////////////////////////////////////////////////////
//
// 加载其他规则。
//
////////////////////////////////////////////////////////////////////////////////
// 特殊词汇
"[双|两|叁|多][\\s]?(方|点|个|项|次|根|颗|条|名|套|份)", //数量
// 航班
"$e+$n次[\\s]?航班", //航班
// 股票
"(A|B|H)股", //股票
// 油料
"$n#", //油料
"$f个油", //油耗
// 维生素
"维生素(A|B|C|D|E|K|H|P|M|T|U)\\d{0,2}", //维生素
};
在随后的实际运行中,就发现处理速度十分缓慢。初步估算了一下,要将语料(10,101,283条)全部处理完成,至少需要三天的时间。
通过Visual Studio Profiler探测工具对CPU的运行情况做了检测,最后发现85%的运行时间都被分配到了该函数:
private static Quantity[] GetMatches(string content)
{
// 创建对象
List<Quantity> quantities = new List<Quantity>();
// 循环处理
foreach(string rule in QUANTITY_RULES)
{
// 恢复规则
string newRule = Quantity.RecoverRule(rule);
// 检查匹配
// 此函数非常消耗时间!!!
Match match = Regex.Match(content, newRule);
// 检查结果
while(match.Success)
{
// 创建对象
Quantity quantity =
new Quantity();
// 设置数值
quantity.Index = match.Index;
quantity.Value = match.Value;
quantity.Length = match.Length;
// 加入列表
quantities.Add(quantity);
// 下一个
match = match.NextMatch();
}
}
// 按照索引数值排序
quantities = quantities.OrderBy(q => q.Index).ToList();
// 返回结果
return quantities.ToArray();
}
其中以Regex.Match占用时间最多。这个可是内置系统函数,作者表示无能为力。
不过随后发现:程序运行的CPU占比并不高,大约只有12%左右。于是就想尝试下多线程并行处理。最后终于在网络上找到了合适的解决方案。这里需要用到一个类LimitedConcurrencyLevelTaskScheduler:
using System;
using System.Text;
using System.Data;
using System.Threading;
using System.Data.SqlClient;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Text.RegularExpressions;
namespace Misc
{
// Provides a task scheduler that ensures a maximum concurrency level while
// running on top of the thread pool.
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
// Indicates whether the current thread is processing work items.
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
// The list of tasks to be executed
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
// The maximum concurrency level allowed by this scheduler.
private readonly int _maxDegreeOfParallelism;
// Indicates whether the scheduler is currently processing work items.
private int _delegatesQueuedOrRunning = 0;
// Creates a new instance with the specified degree of parallelism.
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
// Queues a task to the scheduler.
protected sealed override void QueueTask(Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
// Inform the ThreadPool that there's work to be executed for this scheduler.
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
// Get the next item from the queue
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
// Execute the task we pulled out of the queue
base.TryExecuteTask(item);
}
}
// We're done processing items on the current thread
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
// Attempts to execute the specified task on the current thread.
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems) return false;
// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued)
// Try to run the task.
if (TryDequeue(task))
return base.TryExecuteTask(task);
else
return false;
else
return base.TryExecuteTask(task);
}
// Attempt to remove a previously scheduled task from the scheduler.
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
// Gets the maximum concurrency level supported by this scheduler.
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
// Gets an enumerable of the tasks currently scheduled on this scheduler.
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken) return _tasks;
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
}
}
通过这个类可以实现控制线程池的大小。
public static void ExtractSentences(bool flag)
{
// 记录日志
Log.LogMessage("SentenceExtractor", "ExtractSentences", "提取句子数据!");
//指令字符串
string cmdString =
"SELECT [rid], [content] FROM [dbo].[RawContent];";
// 创建数据库连接
SqlConnection sqlConnection = new SqlConnection(Common.CONNECT_STRING);
try
{
// 计数器
int taskCount = 0;
// 同步锁对象
object lockObject = new object();
// 任务数组
List<Task> tasks = new List<Task>();
// 生成任务控制器
TaskFactory factory = new TaskFactory(
new LimitedConcurrencyLevelTaskScheduler(Common.MAX_THREADS));
// 计数器
int total = 0;
// 开启数据库连接
sqlConnection.Open();
// 创建指令
SqlCommand sqlCommand =
new SqlCommand(cmdString, sqlConnection);
// 创建数据阅读器
SqlDataReader reader = sqlCommand.ExecuteReader();
// 循环处理
while (reader.Read())
{
// 增加计数
total++;
// 检查计数
if (total % 10000 == 0)
{
// 打印记录
Log.LogMessage(string.Format("{0} items processed !", total));
}
// 获得编号
int rid = reader.GetInt32(0);
// 获得内容
string content = reader.GetString(1);
// 检查内容
if (content == null || content.Length <= 0) continue;
// 启动进程
tasks.Add(factory.StartNew
(() =>
{
lock (lockObject)
{
// 增加计数
taskCount++;
// 检查计数
if (taskCount % 10000 == 0)
{
// 打印记录
Log.LogMessage(string.Format("{0} tasks finised !", taskCount));
}
}
// 清理内容
content = MiscTool.ClearContent(content);
// 检查内容
if (content == null || content.Length <= 0) return;
// 切分字符串
string[] output = SplitContent(content);
// 检查结果
if (output == null || output.Length <= 0) return;
// 合并字符串
output = MergeContent(output);
// 检查结果
if (output == null || output.Length <= 0) return;
// 获得拆分结果
string[][] sentences = GetSentences(output);
// 检查结果
if (sentences == null || sentences.Length <= 0) return;
// 检查标记位
if (flag)
{
// 循环处理
foreach (string[] sentence in sentences)
{
// 加入记录
SentenceContent.AddSentence(Concatenate(sentence), rid);
}
}
}));
}
// 关闭数据阅读器
reader.Close();
// 记录日志
Log.LogMessage(string.Format("\tcontent.count = {0}", total));
// 记录日志
Log.LogMessage("SentenceExtractor", "ExtractSentences", "等待所有任务完成 !");
// 等待所有任务完成
Task.WaitAll(tasks.ToArray());
}
catch (System.Exception ex)
{
// 记录日志
Log.LogMessage("SentenceExtractor", "ExtractSentences", "unexpected exit !");
Log.LogMessage(string.Format("\texception.message = {0}", ex.Message));
}
finally
{
// 检查状态并关闭连接
if (sqlConnection.State == ConnectionState.Open) sqlConnection.Close();
}
// 记录日志
Log.LogMessage("SentenceExtractor", "ExtractSentences", "句子提取完毕!");
}
统计程序最终一口气启动了1000多万个任务,然后WaitAll去完成。这正是为搜刮计算机的资源绞尽脑汁。
最后实际运行的时候,CPU整体消耗终于爬到80%以上(留点余地做其他事情)。大约16个小时左右就完成了统计任务,比之前的方式要快6倍。
一口气启动1000多万个任务,还不是最夸张的事情!
(3)统计短句频次
到了这里,才是真正遇到了“瓶颈”。要分析48,218,815条不同的短句在语料(10,101,283条)中的频次,这个匹配运算数量级几乎要上天!
为了提高运行效率,照上面的方法使用多任务。但是此次没有同时开启4800多万个任务。而是要控制任务队列的数量。为此增加了如下一段代码:
// 检查任务队列长度
while(tasks.Count > 10000000)
{
// 记录日志
Log.LogMessage("SentenceStatistic", "MakeStatistic", "任务已满,需要等待全部结束!");
// 等待全部任务结束
Task.WaitAll(tasks.ToArray()); tasks.Clear();
// 记录日志
Log.LogMessage("SentenceStatistic", "MakeStatistic", "任务全部结束,继续分配新任务!");
}
还是同时开启1000多万个任务,然后等待完成。这次由于匹配比较简单,没有采用Regex.Match函数,否则效率可能还要更低。
根据实际运行的效果,作者估算:完成全部的统计分析需要3年的时间!这样来看,不要开启过多的频次统计,否则计算量不是一般的大。现在需要决定:只统计多长的字符串就足够了。否则,这台设备是无法完成任务的。
三、疯狂的总结
在上述的内容中,做了一些比较疯狂的事情,但是Windows 11一切运行正常,并未出现任何问题。
- 一次性将48,218,815条短句装入到Dictionary<string, int>之中。
- 一次性开启1000万个任务,并且等待这些任务完成。
结果,这些方法技巧在计算量面前,就是个渣。还是要想办法从算法上来解决,光靠“暴力”是不行的。