Commit 2d4dee73 by mahaisong

fix:并行化,7秒执行完毕800W条记录,可以设置块大小,支持更高速读取1G 5G 10G 文件

parent db23a30e
......@@ -54,6 +54,7 @@
<Compile Include="ListModel.cs" />
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="V1.cs" />
<EmbeddedResource Include="Form1.resx">
<DependentUpon>Form1.cs</DependentUpon>
</EmbeddedResource>
......
......@@ -81,6 +81,7 @@ namespace CsvCount_ES
{
Stopwatch st = new Stopwatch();
st.Start();
int Btotal = 0;
if (!String.IsNullOrWhiteSpace(textBox1.Text))
{
......@@ -116,7 +117,7 @@ namespace CsvCount_ES
setup:
try
{
#region 基本用法--820W行数据,439MB,转成字符串、split后,执行消耗14~15秒。
#region 基本用法--820W行数据,439MB,转成字符串、split后,执行消耗24~25秒。
////读取文件
//FileInfo fi = new FileInfo(filepath);
//Int64 size = fi.Length;//byte长度
......@@ -191,26 +192,39 @@ namespace CsvCount_ES
#endregion
#region 并发用法:真正大文件时比StreamReader有用--820W行数据,439MB,转成字符串、split后,执行消耗7~8秒。 10K 1块。
//读取文件
FileInfo fi = new FileInfo(filepath);
Int64 size = fi.Length;//byte长度
int offset = 0;
int length = 512000;//每次块大小 512000byte 500KB 这样1个G转2010次算完
int length = 10240;//每次块大小 512000byte 500KB 这样1个G转2010次算完
//10兆字节(mb)=10485760字节(b)
//UTF-8编码中,一个英文字符等于一个字节,一个中文(含繁体)等于三个字节。 本次文件中主要是英文和数字,没有中文。所以要多少字就配多大块。
//50千字节(kb)=51200字节(b)
//200千字节(kb)=204800字节(b)
Int64 portion = (Int64)Math.Ceiling(size * 1.0 / length); //分成多少块
string fp2 = @filepath;
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(fp2, FileMode.Open, "mapname"))
//统一使用总文件大小的内存隐射对象
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(fp2, FileMode.Open, "mapname", size))
{
//从零开始,每块单独并行处理。
Parallel.For(0,portion,(index,ParallelLoopState)=>
{
//块大小和 文件总大小-第几分区位置的剩余位置,取两者较小的
Int64 fileSize = Math.Min(length, size - length * index);
if (fileSize > 0)
{
using (var accessor = mmf.CreateViewAccessor(offset, size))
using (var accessor = mmf.CreateViewAccessor(length * index, fileSize, MemoryMappedFileAccess.Read))
{
List<string> list = new List<string>();
//每隔100是1行
for (int i = 0; i < size; i += length)
{
Int64 listkey = ADD();
////每隔100是1行
//for (int i = 0; i < fileSize; i += length)
//{
//Int64 listkey = ADD();
......@@ -233,15 +247,15 @@ namespace CsvCount_ES
//resultcount:读入 array 的结构数。 如果可用结构较少,则此值可能小于 count;如果到达访问器末尾,则为零。
#endregion
int resultcount = accessor.ReadArray(i, byteArray, 0, length);
int resultcount = accessor.ReadArray(0, byteArray, 0, (int)fileSize);
//测试通过:\r\n \r 空格 \n(注意:\r 空格 \n 都算1个字符,\r\n算2个字符)
//规则:最后必须是\n结尾,cvs的换行都是\n结尾。
string str = System.Text.Encoding.Default.GetString(byteArray);
int index = str.LastIndexOf('\n');
int index1 = str.LastIndexOf('\n');
string[] k = str.Split('\n');
string endstr = null;
if (str.Length != index)
if (str.Length != index1)
{
//不相等,最后一行不能Split分析。
//spilt时,将最后一行保存起来,本次处理索引-1。
......@@ -277,12 +291,20 @@ namespace CsvCount_ES
//头尾:另外保存
ListModel listmodel = new ListModel(k[0].Replace("\0", string.Empty).Replace("\r", string.Empty).Trim(), endstr);
listModelDic.TryAdd(listkey, listmodel);
listModelDic.TryAdd(index, listmodel);
}
//}
}
//分区时,最后一个为0则忽略。
}
});
}
if (listModelDic.Count() > 0)
......@@ -305,12 +327,11 @@ namespace CsvCount_ES
else
{
//头尾单独链式处理: 从1开始。第0行的begin 是列名称,不需要分析。
//注意这里是用i标识key,而key是从1开始的,i=2,i-1就等于1,即,key=1的值。同理。listModelDic.Count()+1为901,i为900时,计算899end和900的begin。
for (int i = 2; i < listModelDic.Count() + 1; i++)
for (int i = 1; i < listModelDic.Count() ; i++)
{
try
{
count = count + 1;//记录计算了多少次,与分块对应(其实就是与listModelDic的key-1对应即代表完整)
//count = count + 1;//记录计算了多少次,与分块对应(其实就是与listModelDic的key-1对应即代表完整)
string str = "";
if (!String.IsNullOrWhiteSpace(listModelDic[i - 1].End))
......@@ -343,6 +364,7 @@ namespace CsvCount_ES
}
}
#endregion
}
catch (IOException ex)
{
......@@ -366,13 +388,12 @@ namespace CsvCount_ES
MessageBox.Show(ex.ToString());
}
#endregion
st.Stop();
MessageBox.Show("运行时间:" + st.ElapsedMilliseconds.ToString());
#region 完整性测试---使用时注释掉。
#region 完整性测试---使用时注释掉。 执行效率 5-6
try
{
......@@ -408,18 +429,26 @@ namespace CsvCount_ES
swtp.Stop();
MessageBox.Show("完整性测试运行时间:" + swtp.ElapsedMilliseconds.ToString());
#endregion
//可能出现重复,这里的条数与日志文件的条数可能不同。
foreach (var item in BtotalCountDic)
{
Btotal = Btotal + item.Value;
}
//全比较
var dz1 = BtotalCountDic.Except(totalCountDic);
//key比较
var dz2 = BtotalCountDic.Keys.Except(totalCountDic.Keys);
if (null == dz1.First() && null == dz2.First())
if (dz1.Count() == 0 && dz2.Count() == 0)
{
//两个方式结果一致。
MessageBox.Show("两个方式结果一致");
}
else
{
MessageBox.Show("完整性测试报错" );
MessageBox.Show("完整性测试报错");
}
}
catch (Exception exe1)
......@@ -445,6 +474,21 @@ namespace CsvCount_ES
button1.Enabled = true;
int total = 0;
foreach (var item in totalCountDic)
{
total = total+ item.Value;
}
if (total == Btotal)
{
MessageBox.Show("正确:点击数和为总条数");
}
else
{
MessageBox.Show("错误:点击数和不等于总条数");
}
MessageBox.Show("结果行数:" + count.ToString());
errorCount = 0;
......
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.Drawing;
using System.IO;
using System.IO.MemoryMappedFiles;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace CsvCount_ES
{
public class V1
{
//key:newsid字段,也是itemid字段,value:int-次数累计
private ConcurrentDictionary<string, int> totalCountDic = new ConcurrentDictionary<string, int>();
//链式结构 key: Int64 累加(对应分块的块号), value:这一块的ListModel的第一行和最后一行的string内容
private ConcurrentDictionary<Int64, ListModel> listModelDic = new ConcurrentDictionary<Int64, ListModel>();
private static Int64 sn = 0;
//警告 在 32 位 Intel 计算机上分配 64 位值不是原子操作;即该操作不是线程安全的。这意味着,如果两个人同时将一个值分配给一个静态 Int64 字段,则该字段的最终值是无法预测的。
/// <summary>
/// 得到编号加1的值
/// </summary>
/// <returns></returns>
public static Int64 ADD()
{
//新建线程,线程数加一
return Interlocked.Increment(ref sn);
}
public V1()
{
}
private void button2_Click(object sender, EventArgs e)
{
Stopwatch st = new Stopwatch();
st.Start();
int Btotal = 0;
int count = 0;
int errorCount = 0;
#region 执行内存映射方案
Application.DoEvents();
//输出测试
//for (int i = 0; i < 500; i++)
//{
// //跨线程访问
// this.BeginInvoke(new MethodInvoker(delegate ()
// {
// count = count + 1;
// this.listBox1.SuspendLayout();
// this.listBox1.Items.Add("开始:" + count.ToString() + "————————————————————————————————结束");
// this.listBox1.ResumeLayout();
// Application.DoEvents();
// }));
//}
string filepath = "";// openFileDialog1.FileName;
setup:
try
{
#region 基本用法--820W行数据,439MB,转成字符串、split后,执行消耗24~25秒。
////读取文件
//FileInfo fi = new FileInfo(filepath);
//Int64 size = fi.Length;//byte长度
//int offset = 0;
//int length = 512000;//每次块大小 512000byte 500KB 这样1个G转2010次算完
////UTF-8编码中,一个英文字符等于一个字节,一个中文(含繁体)等于三个字节。 本次文件中主要是英文和数字,没有中文。所以要多少字就配多大块。
//string fp2 = @filepath;
//using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(fp2, FileMode.Open, "mapname"))
//{
// using (var accessor = mmf.CreateViewAccessor(offset, size))
// {
// List<string> list = new List<string>();
// //每隔100是1行
// for (int i = 0; i < size; i += length)
// {
// byte[] byteArray = new byte[length];
// //ReadArray参数
// //position
// //Type: System.Int64
// //访问器中的字节偏移量,从此处开始读取。
// //array
// //Type: T[]
// //包含从访问器读取的结构的数组。
// //offset
// //Type: System.Int32
// //array 中要将第一个复制的结构放置到的索引。
// //count
// //Type: System.Int32
// //要从访问器读取的 T 类型的结构的数目。
// //resultcount:读入 array 的结构数。 如果可用结构较少,则此值可能小于 count;如果到达访问器末尾,则为零。
// int resultcount = accessor.ReadArray(i, byteArray, 0, length);
// //\r\n 空格 \n 都测试通过
// //\r 空格 \n 都算1个字符 规则:最后必须是\n结尾
// string str = System.Text.Encoding.Default.GetString(byteArray);
// int index = str.LastIndexOf('\n');
// string[] k = str.Split();
// count = count + k.Length;
// if (str.Length==index)
// {
// //最后一行可以分析
// }
// else
// {
// //str.IndexOf
// //不相等,最后一行不能分析。
// //spilt时,将最后一行保存起来,本次处理索引-1。
// //头条保存起来,也不处理,本次处理索引-2。(标志是否是第一条,异步时,第一个肯定是完整的,不需要存入)
// }
// }
// }
//}
#endregion
#region 并发用法:真正大文件时比StreamReader有用--820W行数据,439MB,转成字符串、split后,执行消耗7~8秒。 10K 1块。
//读取文件
FileInfo fi = new FileInfo(filepath);
Int64 size = fi.Length;//byte长度
int length = 10240;//每次块大小 512000byte 500KB 这样1个G转2010次算完
//10兆字节(mb)=10485760字节(b)
//UTF-8编码中,一个英文字符等于一个字节,一个中文(含繁体)等于三个字节。 本次文件中主要是英文和数字,没有中文。所以要多少字就配多大块。
//50千字节(kb)=51200字节(b)
//200千字节(kb)=204800字节(b)
Int64 portion = (Int64)Math.Ceiling(size * 1.0 / length); //分成多少块
string fp2 = @filepath;
//统一使用总文件大小的内存隐射对象
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(fp2, FileMode.Open, "mapname", size))
{
//从零开始,每块单独并行处理。
Parallel.For(0, portion, (index, ParallelLoopState) =>
{
//块大小和 文件总大小-第几分区位置的剩余位置,取两者较小的
Int64 fileSize = Math.Min(length, size - length * index);
if (fileSize > 0)
{
using (var accessor = mmf.CreateViewAccessor(length * index, fileSize, MemoryMappedFileAccess.Read))
{
List<string> list = new List<string>();
////每隔100是1行
//for (int i = 0; i < fileSize; i += length)
//{
//Int64 listkey = ADD();
byte[] byteArray = new byte[length];
#region 函数参数说明
//ReadArray参数
//position
//Type: System.Int64
//访问器中的字节偏移量,从此处开始读取。
//array
//Type: T[]
//包含从访问器读取的结构的数组。
//offset
//Type: System.Int32
//array 中要将第一个复制的结构放置到的索引。
//count
//Type: System.Int32
//要从访问器读取的 T 类型的结构的数目。
//resultcount:读入 array 的结构数。 如果可用结构较少,则此值可能小于 count;如果到达访问器末尾,则为零。
#endregion
int resultcount = accessor.ReadArray(0, byteArray, 0, (int)fileSize);
//测试通过:\r\n \r 空格 \n(注意:\r 空格 \n 都算1个字符,\r\n算2个字符)
//规则:最后必须是\n结尾,cvs的换行都是\n结尾。
string str = System.Text.Encoding.Default.GetString(byteArray);
int index1 = str.LastIndexOf('\n');
string[] k = str.Split('\n');
string endstr = null;
if (str.Length != index1)
{
//不相等,最后一行不能Split分析。
//spilt时,将最后一行保存起来,本次处理索引-1。
//头条保存起来,也不处理,本次处理索引-2。
endstr = k[k.Count() - 1].Replace("\0", string.Empty).Replace("\r", string.Empty).Trim(); ;
}
//else
//{
// //最后一行可以Split分析,不再处理
//}
//先简化开发,将头部和尾部都另外保存。只计算中间的。然后调用ES (索引-2:从1开始,尾部-1)
//测试: 820W条记录,执行完毕 20~ 22秒。 最终结果3w2013条记录。有的直接点击3000多次。
for (int j = 1; j < k.Count() - 1; j++)
{
try
{
string key = k[j].Split(',')[1].Replace("\0", string.Empty).Replace("\r", string.Empty).Trim();
totalCountDic.AddOrUpdate(key, 0, (skey, scount) => { return scount = scount + 1; });
}
catch (Exception ext)
{
MessageBox.Show("某行数据第2列可能为空" + ext.ToString());
}
}
//头尾:另外保存
ListModel listmodel = new ListModel(k[0].Replace("\0", string.Empty).Replace("\r", string.Empty).Trim(), endstr);
listModelDic.TryAdd(index, listmodel);
//}
}
//分区时,最后一个为0则忽略。
}
});
}
if (listModelDic.Count() > 0)
{
if (listModelDic.Count() == 1)
{
try
{
string key = listModelDic[1].End;
if (!String.IsNullOrWhiteSpace(key))
totalCountDic.AddOrUpdate(listModelDic[1].End.Split(',')[1], 0, (skey, scount) => { return scount = scount + 1; });
}
catch (Exception ext)
{
MessageBox.Show("只有一块时(文件小):第一行数据第2列可能为空" + ext.ToString());
}
}
else
{
//头尾单独链式处理: 从1开始。第0行的begin 是列名称,不需要分析。
for (int i = 1; i < listModelDic.Count(); i++)
{
try
{
//count = count + 1;//记录计算了多少次,与分块对应(其实就是与listModelDic的key-1对应即代表完整)
string str = "";
if (!String.IsNullOrWhiteSpace(listModelDic[i - 1].End))
{
//上一个的end+本轮的begin拼接,不需要去除/n
str = listModelDic[i - 1].End + listModelDic[i].Begin;
}
else
{
//上一个的end(为空)+本轮的begin拼接,不需要去除/n
str = listModelDic[i].Begin;
}
string key = str.Split(',')[1];
totalCountDic.AddOrUpdate(key, 0, (skey, scount) => { return scount = scount + 1; });
}
catch (Exception ext)
{
MessageBox.Show("头尾单独链式处理:某行数据第2列可能为空" + ext.ToString());
}
}
//最后补充最后一条的end 是没有换行了。所以不需要处理。肯定为null。 keyend.end 是空字符串。
//object keyend = listModelDic[listModelDic.Count()];
}
}
#endregion
}
catch (IOException ex)
{
errorCount = errorCount + 1;
if (errorCount > 2)
{
MessageBox.Show(ex.ToString());
errorCount = 0;
}
else
{
//遇到csv无法文件无法使用时,通过修改扩展名实现。
filepath = Path.ChangeExtension(filepath, ".txt");
//个别CSV\ppt\word等文件类型,则必须强转为任何其他类型的文本后缀结构。否则MemoryMappedFile不识别.
goto setup;
}
}
catch (Exception ex)
{
MessageBox.Show(ex.ToString());
}
#endregion
st.Stop();
MessageBox.Show("运行时间:" + st.ElapsedMilliseconds.ToString());
#region 完整性测试---使用时注释掉。 执行效率 5-6
try
{
//通过一行行的输入,最终获得1个列表B,以B为左,求差集
ConcurrentDictionary<string, int> BtotalCountDic = new ConcurrentDictionary<string, int>();
#region 按行读取代码
//读取文件
Stopwatch swtp = new Stopwatch();
swtp.Start();
count = 0;
StreamReader sr = new StreamReader(filepath, Encoding.Default);
String line;
line = sr.ReadLine();//第一行,头部列名称,不进入集合
while ((line = sr.ReadLine()) != null)
{
try
{
count = count + 1;//记录计算了多少次,与分块对应(其实就是与listModelDic的key-1对应即代表完整)
string key = line.Split(',')[1];
BtotalCountDic.AddOrUpdate(key, 0, (skey, scount) => { return scount = scount + 1; });
}
catch (Exception ext)
{
MessageBox.Show("完整性测试:某行数据第2列可能为空" + ext.ToString());
}
}
swtp.Stop();
MessageBox.Show("完整性测试运行时间:" + swtp.ElapsedMilliseconds.ToString());
#endregion
//可能出现重复,这里的条数与日志文件的条数可能不同。
foreach (var item in BtotalCountDic)
{
Btotal = Btotal + item.Value;
}
//全比较
var dz1 = BtotalCountDic.Except(totalCountDic);
//key比较
var dz2 = BtotalCountDic.Keys.Except(totalCountDic.Keys);
if (dz1.Count() == 0 && dz2.Count() == 0)
{
MessageBox.Show("两个方式结果一致");
}
else
{
MessageBox.Show("完整性测试报错");
}
}
catch (Exception exe1)
{
MessageBox.Show("完整性测试" + exe1.ToString());
}
#endregion
#region 执行完成
int total = 0;
foreach (var item in totalCountDic)
{
total = total + item.Value;
}
if (total == Btotal)
{
MessageBox.Show("正确:点击数和为总条数");
}
else
{
MessageBox.Show("错误:点击数和不等于总条数");
}
MessageBox.Show("结果行数:" + count.ToString());
errorCount = 0;
count = 0;
#endregion
}
}
}
......@@ -12,7 +12,9 @@ C:\Users\admin\Desktop\CsvCount_ES\CsvCount_ES\obj\Debug\CsvCount_ES.exe
C:\Users\admin\Desktop\CsvCount_ES\CsvCount_ES\obj\Debug\CsvCount_ES.pdb
C:\Users\admin\Desktop\CsvCount_ES\CsvCount_ES\bin\Debug\CsvCount_ES.exe
C:\Users\admin\Desktop\CsvCount_ES\CsvCount_ES\bin\Debug\CsvCount_ES.pdb
C:\Users\admin\Desktop\CsvCount_ES\CsvCount_ES\obj\Debug\CsvCount_ES.csprojResolveAssemblyReference.cache
C:\Users\admin\Desktop\CsvCount_ES\CsvCount_ES\obj\Debug\CsvCount_ES.Form1.resources
C:\Users\admin\Desktop\CsvCount_ES\CsvCount_ES\obj\Debug\CsvCount_ES.Properties.Resources.resources
C:\Users\admin\Desktop\CsvCount_ES\CsvCount_ES\obj\Debug\CsvCount_ES.csproj.GenerateResource.Cache
D:\smallproject\6.CsvCount_ES\CsvCount_ES\bin\Debug\CsvCount_ES.exe.config
D:\smallproject\6.CsvCount_ES\CsvCount_ES\obj\Debug\CsvCount_ES.exe
D:\smallproject\6.CsvCount_ES\CsvCount_ES\obj\Debug\CsvCount_ES.pdb
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment