Commit be7c92c7 by mahaisong

fix:

parent ee6bb8b8
using HTCommon.Data;
using HTCommon.Data;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TaikorES_ImportLocalES
{
public static class ErrorItemCache
{
/// <summary>
/// 插入出错的itemid
/// </summary>
/// <remarks>线程安全</remarks>
public static ConcurrentDictionary<string, Item> ErrorItemMap = new ConcurrentDictionary<string, Item>();
}
}
 
...@@ -6,6 +6,7 @@ using HTCommon.DataAccess; ...@@ -6,6 +6,7 @@ using HTCommon.DataAccess;
using HTCommon.Helper; using HTCommon.Helper;
using MinderESCommon; using MinderESCommon;
using Nest; using Nest;
using Newtonsoft.Json;
using Palas.Protocol; using Palas.Protocol;
using PalasEntityModel; using PalasEntityModel;
using System; using System;
...@@ -157,15 +158,7 @@ namespace TaikorES_ImportLocalES ...@@ -157,15 +158,7 @@ namespace TaikorES_ImportLocalES
try try
{ {
this.Invoke(new MethodInvoker(() =>
{
this.button1.Enabled = false;
this.button1.Visible = false;
this.listBox1.Items.Add("正在执行中……");
}));
//载入文件 //载入文件
string filepath = Environment.CurrentDirectory + "\\MatchCrawlID.csv"; string filepath = Environment.CurrentDirectory + "\\MatchCrawlID.csv";
...@@ -193,6 +186,18 @@ namespace TaikorES_ImportLocalES ...@@ -193,6 +186,18 @@ namespace TaikorES_ImportLocalES
MessageBox.Show(filepath + "文件载入有问题" + ex.ToString()); MessageBox.Show(filepath + "文件载入有问题" + ex.ToString());
} }
this.Invoke(new MethodInvoker(() =>
{
this.button1.Enabled = false;
this.button1.Visible = false;
this.listBox1.Items.Add("正在执行中……");
this.listBox1.Items.Add("文件载入" + MatchCrawlID.Count + "条爬虫");
}));
if (_syncState == null) if (_syncState == null)
{ {
//当为空时,创建一下 //当为空时,创建一下
...@@ -219,7 +224,20 @@ namespace TaikorES_ImportLocalES ...@@ -219,7 +224,20 @@ namespace TaikorES_ImportLocalES
DateTime dtitem = DateTime.Now; DateTime dtitem = DateTime.Now;
long count = 0; long count = 0;
string crawlID = item.Value.ToString(); string crawlID = item.Value.ToString();
bool prevAddMonth = false;
//显示 执行完的爬虫ID。
this.Invoke(new MethodInvoker(() =>
{
this.listBox1.Items.Add("开始时间:" + dtitem.ToString("yyyy-MM-dd HH:mm:ss.fff") + "执行完成;列表中第" + item.Key.ToString() + "个爬虫," + item.Value.ToString() );
}));
#region for
try try
{ {
//对每个爬虫,按照日期查询一遍。 //对每个爬虫,按照日期查询一遍。
...@@ -237,23 +255,27 @@ namespace TaikorES_ImportLocalES ...@@ -237,23 +255,27 @@ namespace TaikorES_ImportLocalES
DateTime? TodayQueryTime = Where_BeginTime; DateTime? TodayQueryTime = Where_BeginTime;
DateTime TodayQueryDate = DateTime.Parse(Where_BeginTime.ToString("yyyy-MM-dd")); DateTime TodayQueryDate = Where_BeginTime;
DateTime TodayEndTime; DateTime TodayEndTime;
if ((Where_ENDTime - Where_BeginTime).TotalDays >= 31) if ((Where_ENDTime - Where_BeginTime).TotalDays >= 365)
{ {
TodayEndTime = DateTime.Parse(Where_BeginTime.ToString("yyyy-MM-dd") + " 23:59:59.999").AddMonths(1); TodayEndTime = DateTime.Parse(Where_BeginTime.ToString("yyyy-MM-dd") + " 23:59:59.999").AddYears(1);
prevAddMonth = true;
} }
else else
{ {
TodayEndTime = DateTime.Parse(Where_BeginTime.ToString("yyyy-MM-dd") + " 23:59:59.999"); TodayEndTime = DateTime.Parse(Where_BeginTime.ToString("yyyy-MM-dd") + " 23:59:59.999");
} }
while (TodayEndTime <= Where_ENDTime.AddDays(1)) while (TodayEndTime <= Where_ENDTime.AddDays(1))
{ {
try
{
GC.Collect();
if (TodayEndTime > Where_ENDTime) if (TodayEndTime > Where_ENDTime)
{ {
TodayEndTime = Where_ENDTime; TodayEndTime = Where_ENDTime;
...@@ -270,8 +292,6 @@ namespace TaikorES_ImportLocalES ...@@ -270,8 +292,6 @@ namespace TaikorES_ImportLocalES
{ {
beginTime = TodayQueryDate; beginTime = TodayQueryDate;
} }
int tryCount = 0; int tryCount = 0;
string todayitemID = "";//按照同一fetchTime时间、按照todayitemID号排序,则查询时也要注意大于上次的按照todayitemID号排序号。 string todayitemID = "";//按照同一fetchTime时间、按照todayitemID号排序,则查询时也要注意大于上次的按照todayitemID号排序号。
...@@ -281,10 +301,13 @@ namespace TaikorES_ImportLocalES ...@@ -281,10 +301,13 @@ namespace TaikorES_ImportLocalES
try try
{ {
//当前爬虫、指定时间范围内 的数据,且排除上一次的最后一条。
var search = new SearchDescriptor<Item>(); var search = new SearchDescriptor<Item>();
search.Query(q => search.Query(q =>
q.Bool(b => b.Must q.Bool(b => b.Must
(m => m.DateRange (m => m.Term(t => t.Field(tf => tf.CrawlID).Value(crawlID)))
.Must(m2 => m2.DateRange
(r => (r =>
r.Field(f => f.FetchTime) r.Field(f => f.FetchTime)
...@@ -294,8 +317,8 @@ namespace TaikorES_ImportLocalES ...@@ -294,8 +317,8 @@ namespace TaikorES_ImportLocalES
) )
).MustNot( ).MustNot(
mn=>mn.Bool(bn=>bn.Must(bnm=> mn => mn.Bool(bn => bn.Must(bnm =>
bnm.Match(nm=>nm.Field(f=>f.ItemID).Query(todayitemID)) bnm.Match(nm => nm.Field(f => f.ItemID).Query(todayitemID))
) )
) )
) )
...@@ -306,7 +329,7 @@ namespace TaikorES_ImportLocalES ...@@ -306,7 +329,7 @@ namespace TaikorES_ImportLocalES
.Sort(st => st.Ascending(asc => asc.FetchTime).Ascending(asc => asc.ItemID))//排序 .Sort(st => st.Ascending(asc => asc.FetchTime).Ascending(asc => asc.ItemID))//排序
; ;
TaiKorNewsList = ESAccess.Search<Item>(search).ToList<Item>(); TaiKorNewsList = ESAccess.Search_SearchDetail<Item>(search).ToList<Item>();
} }
catch (Exception ex) catch (Exception ex)
...@@ -318,7 +341,7 @@ namespace TaikorES_ImportLocalES ...@@ -318,7 +341,7 @@ namespace TaikorES_ImportLocalES
} }
//失败--重试3次 //失败--重试3次
Logger.Error(TodayQueryDate.ToString() + "日期查询ES时重试三次依然错误,无法连接或异常,请检查!" + ex.ToString()); Logger.Error(beginTime.ToString()+"-" +TodayEndTime.ToString() + "日期查询ES时重试三次依然错误,无法连接或异常,请检查!" + ex.ToString());
} }
//处理 //处理
...@@ -328,48 +351,16 @@ namespace TaikorES_ImportLocalES ...@@ -328,48 +351,16 @@ namespace TaikorES_ImportLocalES
if (TaiKorNewsList != null && TaiKorNewsList.Count > 0) if (TaiKorNewsList != null && TaiKorNewsList.Count > 0)
{ {
count = TaiKorNewsList.Count + TaiKorNewsList.Count;
//数据转存 //数据转存
//最后1条记录 的ID和发布时间 更新到进度表 //最后1条记录 的ID和发布时间 更新到进度表
_syncState.last_itemid = TaiKorNewsList.LastOrDefault().ItemID.ToString(); _syncState.last_itemid = TaiKorNewsList.LastOrDefault().ItemID.ToString();
_syncState.last_pubdate = TaiKorNewsList.LastOrDefault().FetchTime; _syncState.last_pubdate = TaiKorNewsList.LastOrDefault().FetchTime;
//动态分区: 继承自OrderablePartitioner类, 在AsParallel 时会将数据拆分(按照自定义分区规则)
OrderableListPartitioner<Item> beigeNews = new OrderableListPartitioner<Item>(TaiKorNewsList);
// Use with PLINQ
beigeNews.AsParallel().ForAll(newItem =>
{
//将这个Item加入到ES数据库中
try
{
int retrytime = 0;
do
{
retrytime++;
try
{
MinderESAccess.Index(newItem, 0);
break;
}
catch (Exception ex)
{
Logger.Error("Import to redis mq error! " + ex.ToString());
}
}
while (retrytime < 20);
MinderESAccess.BulkInsertAsync<Item>(TaiKorNewsList.ToArray<Item>());
}
catch (Exception ext)
{
String msg = String.Format(CultureInfo.InvariantCulture,
"Failed to persist item. ItemID {0}, Exception {1}",
newItem.ItemID, ext.ToString());
Logger.Error(msg);
}
});
if (_syncState != null) if (_syncState != null)
{ {
...@@ -391,7 +382,7 @@ namespace TaikorES_ImportLocalES ...@@ -391,7 +382,7 @@ namespace TaikorES_ImportLocalES
try try
{ {
this.listBox1.Items.Add("时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + " 数据月期:" + TodayQueryDate.ToString("yyyy-MM") + "插入ES" + TaiKorNewsList.Count + "条;当前进度为:"+ ((DateTime)(_syncState.last_pubdate)).ToString("yyyy - MM - dd HH: mm:ss.fff")); this.listBox1.Items.Add("时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + " 数据月期:" + TodayQueryDate.ToString("yyyy-MM") + "插入ES" + TaiKorNewsList.Count + "条;当前进度为:" + ((DateTime)(_syncState.last_pubdate)).ToString("yyyy - MM - dd HH: mm:ss.fff"));
} }
catch { } catch { }
...@@ -420,33 +411,33 @@ namespace TaikorES_ImportLocalES ...@@ -420,33 +411,33 @@ namespace TaikorES_ImportLocalES
htItemContext.SyncOffset.AddOrUpdate(_syncState); htItemContext.SyncOffset.AddOrUpdate(_syncState);
htItemContext.SaveChanges(); htItemContext.SaveChanges();
} }
string msg = string.Format("Success sync hj beige news at {0}, this time index {1} items.", DateTime.Now, TaiKorNewsList.Count);
Logger.Info(msg);
}
} }
}
if((Where_ENDTime - TodayEndTime).TotalDays >= 31)
{
//当月执行完,执行下一月。
TodayQueryDate = TodayQueryDate.AddDays(1).AddMonths(1);
TodayEndTime = TodayEndTime.AddDays(1).AddMonths(1);
} }
else catch (Exception ex)
{ {
//如果上一次是加月的情况,则TodayQueryDate单独向后移动1个月
if (prevAddMonth) Logger.Error("列表中第" + item.Key.ToString() + "个爬虫:" + item.Value.ToString() + ",while时错误!" + ex.ToString() + "当前日期为:" + TodayEndTime.ToString("yyyy-MM-dd HH:mm:ss.fff"));
}
if (TodayEndTime == Where_ENDTime)
{ {
prevAddMonth = false; break;
}
TodayQueryDate = DateTime.Parse(TodayEndTime.AddDays(1).ToString("yyyy-MM-dd")); TodayQueryDate = DateTime.Parse(TodayEndTime.AddDays(1).ToString("yyyy-MM-dd"));
if ((Where_ENDTime - TodayEndTime).TotalDays >= 365)
{
//当月执行完,执行下一年。
TodayEndTime = TodayEndTime.AddDays(1).AddYears(1);
} }
else else
{ {
//日常加1天
TodayQueryDate = TodayQueryDate.AddDays(1);
}
TodayEndTime = TodayEndTime.AddDays(1); TodayEndTime = TodayEndTime.AddDays(1);
} }
...@@ -470,6 +461,33 @@ namespace TaikorES_ImportLocalES ...@@ -470,6 +461,33 @@ namespace TaikorES_ImportLocalES
})); }));
} }
if (ErrorItemCache.ErrorItemMap.Count > 0)
{
this.Invoke(new MethodInvoker(() =>
{
this.listBox1.Items.Add("执行补录;曾经插入报错的,重新插入" + ErrorItemCache.ErrorItemMap.Count.ToString() + "条记录: ");
}));
while (ErrorItemCache.ErrorItemMap.Count > 0)
{
var idvalue= ErrorItemCache.ErrorItemMap.ElementAt(0);
Item testmodel;
ErrorItemCache.ErrorItemMap.TryRemove(idvalue.Key, out testmodel);
//重新发起请求,请求指定的ID号的数据,进行插入。
MinderESAccess.IndexOrUpdate(testmodel, testmodel.ItemID);
}
this.Invoke(new MethodInvoker(() =>
{
this.listBox1.Items.Add("执行完成补录");
}));
}
#endregion
} }
} }
catch (Exception ex) catch (Exception ex)
......
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
<TargetFrameworkProfile /> <TargetFrameworkProfile />
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget> <PlatformTarget>x64</PlatformTarget>
<DebugSymbols>true</DebugSymbols> <DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType> <DebugType>full</DebugType>
<Optimize>false</Optimize> <Optimize>false</Optimize>
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
<DefineConstants>DEBUG;TRACE</DefineConstants> <DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport> <ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel> <WarningLevel>4</WarningLevel>
<Prefer32Bit>false</Prefer32Bit>
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget> <PlatformTarget>AnyCPU</PlatformTarget>
...@@ -108,6 +109,7 @@ ...@@ -108,6 +109,7 @@
<Reference Include="System.Xml" /> <Reference Include="System.Xml" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="ErrorItemCache.cs" />
<Compile Include="Form1.cs"> <Compile Include="Form1.cs">
<SubType>Form</SubType> <SubType>Form</SubType>
</Compile> </Compile>
......
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\RemoteImportLocalES\bin\Debug\TaikorES_ImportLocalES.exe.config D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\RemoteImportLocalES\bin\Debug\TaikorES_ImportLocalES.exe.config
...@@ -65,6 +65,7 @@ D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\RemoteImp ...@@ -65,6 +65,7 @@ D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\RemoteImp
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\RemoteImportLocalES\obj\Debug\TaikorES_ImportLocalES.csproj.GenerateResource.Cache D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\RemoteImportLocalES\obj\Debug\TaikorES_ImportLocalES.csproj.GenerateResource.Cache
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\RemoteImportLocalES\obj\Debug\TaikorES_ImportLocalES.exe D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\RemoteImportLocalES\obj\Debug\TaikorES_ImportLocalES.exe
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\RemoteImportLocalES\obj\Debug\TaikorES_ImportLocalES.pdb D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\RemoteImportLocalES\obj\Debug\TaikorES_ImportLocalES.pdb
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\NPOI.dll
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\MatchCrawlID.csv D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\MatchCrawlID.csv
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\TaikorES_ImportLocalES.exe.config D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\TaikorES_ImportLocalES.exe.config
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\TaikorES_ImportLocalES.exe D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\TaikorES_ImportLocalES.exe
...@@ -108,7 +109,6 @@ D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ ...@@ -108,7 +109,6 @@ D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\IKVM.OpenJDK.Naming.dll D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\IKVM.OpenJDK.Naming.dll
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\IKVM.OpenJDK.Beans.dll D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\IKVM.OpenJDK.Beans.dll
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\IKVM.OpenJDK.Jdbc.dll D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\IKVM.OpenJDK.Jdbc.dll
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\NPOI.dll
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\NPOI.OpenXmlFormats.dll D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\NPOI.OpenXmlFormats.dll
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\NPOI.OpenXml4Net.dll D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\NPOI.OpenXml4Net.dll
D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\ServiceStack.Common.dll D:\smallproject\10.Paul需求-JSON贝格ES导入本地ES\ImportLocalES\TaikorES_ImportLocalES\bin\Debug\ServiceStack.Common.dll
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment