Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
S
smallproject
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
mahaisong
smallproject
Commits
745f30b6
Commit
745f30b6
authored
Jun 04, 2018
by
mahaisong
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix:
parent
79a0baa7
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
473 additions
and
0 deletions
+473
-0
.suo
10.Paul需求-JSON贝格ES导入本地ES/MinderES/.vs/MinderES/v15/.suo
+0
-0
db.lock
...ES导入本地ES/MinderES/.vs/MinderES/v15/Server/sqlite3/db.lock
+1
-0
storage.ide
...本地ES/MinderES/.vs/MinderES/v15/Server/sqlite3/storage.ide
+0
-0
BeigeNews.cs
...JSON贝格ES导入本地ES/MinderES/BeigeOracle_MinderES/BeigeNews.cs
+47
-0
BeigeESJob.cs
...ES/MinderES/BeigeOracle_MinderES/QuartzJobs/BeigeESJob.cs
+233
-0
MatchCrawlID.csv
...SON贝格ES导入本地ES/MinderES/TaikorES_MinderES/MatchCrawlID.csv
+0
-0
TaikorES.cs
...S导入本地ES/MinderES/TaikorES_MinderES/QuartzJobs/TaikorES.cs
+192
-0
No files found.
10.Paul需求-JSON贝格ES导入本地ES/MinderES/.vs/MinderES/v15/.suo
0 → 100644
View file @
745f30b6
File added
10.Paul需求-JSON贝格ES导入本地ES/MinderES/.vs/MinderES/v15/Server/sqlite3/db.lock
0 → 100644
View file @
745f30b6
++ "b/10.Paul\351\234\200\346\261\202-JSON\350\264\235\346\240\274ES\345\257\274\345\205\245\346\234\254\345\234\260ES/MinderES/.vs/MinderES/v15/Server/sqlite3/db.lock"
10.Paul需求-JSON贝格ES导入本地ES/MinderES/.vs/MinderES/v15/Server/sqlite3/storage.ide
0 → 100644
View file @
745f30b6
File added
10.Paul需求-JSON贝格ES导入本地ES/MinderES/BeigeOracle_MinderES/BeigeNews.cs
0 → 100644
View file @
745f30b6
using
System
;
using
System
;
using
System.Collections.Generic
;
using
System.Linq
;
using
System.Text
;
using
System.Threading.Tasks
;
namespace
BeigeOracle_MinderES
{
/// <summary>
/// 贝格新闻
/// </summary>
public
class
BeigeNews
{
/// <summary>
/// 贝格新闻ID
/// </summary>
public
long
ID
{
set
;
get
;
}
/// <summary>
/// 媒体名称
/// </summary>
public
string
MED_NAME
{
set
;
get
;
}
/// <summary>
/// 发布时间
/// </summary>
public
DateTime
PUB_TIME
{
set
;
get
;
}
/// <summary>
/// 作者
/// </summary>
public
string
AUT
{
set
;
get
;
}
/// <summary>
/// 标题
/// </summary>
public
string
TIT
{
set
;
get
;
}
/// <summary>
/// URL
/// </summary>
public
string
LNK_ADDR
{
set
;
get
;
}
/// <summary>
/// 内容_FMT(猜:某种类型?)
/// </summary>
public
int
?
CONT_FMT
{
set
;
get
;
}
/// <summary>
/// 内容
/// </summary>
public
string
CONT
{
set
;
get
;
}
}
}
10.Paul需求-JSON贝格ES导入本地ES/MinderES/BeigeOracle_MinderES/QuartzJobs/BeigeESJob.cs
0 → 100644
View file @
745f30b6
using
HJBeigeModel
;
using
HJBeigeModel
;
using
HTCommon.Data
;
using
HTCommon.DataAccess
;
using
HTCommon.Helper
;
using
MinderESCommon
;
using
Oracle.ManagedDataAccess.Client
;
using
PalasEntityModel
;
using
Quartz
;
using
System
;
using
System.Collections.Generic
;
using
System.Data
;
using
System.Data.Entity.Migrations
;
using
System.Globalization
;
using
System.Linq
;
using
System.Text
;
using
System.Threading.Tasks
;
namespace
BeigeOracle_MinderES.QuartzJobs
{
public
class
BeigeESJob
:
IJob
{
private
static
string
sync_name
=
"HuaJinNews_Beige"
;
private
static
Int32
Query_Count
=
500
;
public
void
Execute
(
IJobExecutionContext
context
)
{
try
{
int
totalcount
=
0
;
//得到上次最后1个itemid
long
lastId
=
0
;
//首先判断上次抓取的编号
SyncOffset
_syncState
=
null
;
//没有,则创建新的
using
(
PalasEntityContext
palasEntityContext
=
new
PalasEntityContext
())
{
_syncState
=
palasEntityContext
.
SyncOffset
.
Where
(
f
=>
f
.
source_name
==
sync_name
).
FirstOrDefault
();
}
//得到上次最后1个itemid
if
(
_syncState
!=
null
&&
!
string
.
IsNullOrWhiteSpace
(
_syncState
.
last_itemid
))
{
long
.
TryParse
(
_syncState
.
last_itemid
,
out
lastId
);
}
else
{
//当为空时,创建一下
_syncState
=
new
SyncOffset
();
_syncState
.
last_itemid
=
"0"
;
lastId
=
0
;
_syncState
.
create_date
=
DateTime
.
Now
;
}
int
tryCount
=
0
;
//goto
TagToday
:
List
<
BeigeNews
>
BeigeNewsList
=
null
;
try
{
//(接着上次的继续计算)从 Oracle 中抽取 TXT_NWS_BAS 的数据。
//TXT_NWS_BAS 的ID主键是自增INT型,所以可以顺序抽取
//TXT_NWS_BAS.ID 是TXT_NWS_BAS_TXT.ORIG_ID , 但是也可能没有ORIG_ID。
//linq速度提升。
using
(
HJBeigeContext
hjBeigeContext
=
new
HJBeigeContext
())
{
var
data
=
from
o
in
hjBeigeContext
.
TXT_NWS_BAS
join
d
in
hjBeigeContext
.
TXT_NWS_BAS_TXT
on
o
.
ID
equals
d
.
ORIG_ID
into
dc
from
dci
in
dc
.
DefaultIfEmpty
()
where
o
.
ID
>
lastId
orderby
o
.
PUB_DT
,
o
.
ID
select
new
BeigeNews
{
ID
=
o
.
ID
,
MED_NAME
=
o
.
MED_NAME
,
PUB_TIME
=
o
.
PUB_DT
,
AUT
=
o
.
AUT
,
TIT
=
o
.
TIT
,
LNK_ADDR
=
o
.
LNK_ADDR
,
CONT_FMT
=
dci
.
CONT_FMT
,
CONT
=
dci
.
CONT
};
BeigeNewsList
=
data
.
ToList
().
Take
(
Query_Count
).
ToList
();
}
}
catch
(
Exception
ex
)
{
if
(
tryCount
<
3
)
{
tryCount
=
tryCount
+
1
;
goto
TagToday
;
}
//失败--重试3次
LogService
.
WriteError
(
DateTime
.
Now
.
ToString
()
+
"日期查询数据库时重试三次依然错误,无法连接或异常,请检查!"
+
ex
.
ToString
());
Console
.
WriteLine
(
DateTime
.
Now
.
ToString
()
+
"日期查询数据库时重试三次依然错误,无法连接或异常,请检查!"
+
ex
.
ToString
());
}
//处理
_syncState
.
source_name
=
sync_name
;
_syncState
.
modify_date
=
DateTime
.
Now
;
_syncState
.
memo
=
"贝格新闻数据同步"
;
if
(
BeigeNewsList
!=
null
&&
BeigeNewsList
.
Count
>
0
)
{
//数据转存
//最后1条记录 的ID和发布时间 更新到进度表
totalcount
=
totalcount
+
BeigeNewsList
.
Count
;
_syncState
.
last_itemid
=
BeigeNewsList
.
LastOrDefault
().
ID
.
ToString
();
_syncState
.
last_pubdate
=
BeigeNewsList
.
LastOrDefault
().
PUB_TIME
;
//动态分区: 继承自OrderablePartitioner类, 在AsParallel 时会将数据拆分(按照自定义分区规则)
OrderableListPartitioner
<
BeigeNews
>
beigeNews
=
new
OrderableListPartitioner
<
BeigeNews
>(
BeigeNewsList
);
// Use with PLINQ
beigeNews
.
AsParallel
().
ForAll
(
newItem
=>
{
//改写成Item
Item
item
=
new
Item
();
//将数据构造为ITEM。
item
.
Crawler
=
"Oracle_Beige_NEWS"
;
item
.
CrawlID
=
"Oracle_Beige_NEWS"
;
item
.
AuthorName
=
newItem
.
AUT
;
item
.
CleanTitle
=
newItem
.
TIT
;
item
.
ClientItemID
=
newItem
.
ID
.
ToString
();
item
.
MediaName
=
newItem
.
MED_NAME
;
item
.
PubDate
=
newItem
.
PUB_TIME
;
item
.
Url
=
newItem
.
LNK_ADDR
;
item
.
HTMLText
=
newItem
.
CONT
;
item
.
FetchTime
=
newItem
.
PUB_TIME
;
item
.
CleanText
=
HTMLCleaner
.
CleanHTML
(
newItem
.
CONT
,
true
);
item
.
ItemID
=
HTCommon
.
Helper
.
MD5Helper
.
getMd5Hash
(
item
.
Crawler
+
item
.
ClientItemID
);
item
.
Tag
=
"MinderESFromBeigeOracle"
;
//将这个Item加入到ES数据库中
try
{
int
retrytime
=
0
;
do
{
retrytime
++;
try
{
MinderESAccess
.
Index
<
Item
>(
item
,
0
);
break
;
}
catch
(
Exception
ex
)
{
LogService
.
WriteError
(
"Import to ES error! "
+
ex
.
ToString
());
Console
.
WriteLine
(
"Import to ES error! "
+
ex
.
ToString
());
}
}
while
(
retrytime
<
20
);
}
catch
(
Exception
ext
)
{
String
msg
=
String
.
Format
(
CultureInfo
.
InvariantCulture
,
"Failed to persist item. ItemID {0}, Exception {1}"
,
item
.
ItemID
,
ext
.
ToString
());
LogService
.
WriteError
(
msg
);
Console
.
WriteLine
(
msg
);
}
});
if
(
_syncState
!=
null
)
{
using
(
PalasEntityContext
htItemContext
=
new
PalasEntityContext
())
{
htItemContext
.
SyncOffset
.
AddOrUpdate
(
_syncState
);
htItemContext
.
SaveChanges
();
}
string
msg
=
string
.
Format
(
"Success sync hj beige news at {0}, this time index {1} items."
,
DateTime
.
Now
,
BeigeNewsList
.
Count
);
LogService
.
WriteInfo
(
msg
);
Console
.
WriteLine
(
msg
);
}
}
if
(
null
!=
BeigeNewsList
&&
BeigeNewsList
.
Count
==
Query_Count
)
{
//认为当天内还有其他记录,需继续执行。
tryCount
=
0
;
lastId
=
long
.
Parse
(
BeigeNewsList
.
LastOrDefault
().
ID
.
ToString
());
goto
TagToday
;
}
if
(
null
==
BeigeNewsList
||
0
==
BeigeNewsList
.
Count
)
{
if
(
_syncState
!=
null
)
{
using
(
PalasEntityContext
htItemContext
=
new
PalasEntityContext
())
{
htItemContext
.
SyncOffset
.
AddOrUpdate
(
_syncState
);
htItemContext
.
SaveChanges
();
}
string
msg
=
string
.
Format
(
"Success sync hj beige news at {0}, this time index {1} items."
,
DateTime
.
Now
,
BeigeNewsList
.
Count
);
LogService
.
WriteInfo
(
msg
);
Console
.
WriteLine
(
msg
);
}
}
LogService
.
WriteInfo
(
"完成:BeigeOracle_MinderES:"
+
totalcount
+
"条;"
+
DateTime
.
Now
.
ToString
()
+
" -lastId: "
+
lastId
+
",数据均以导入到MinderES中,请检查!"
);
Console
.
WriteLine
(
"完成:BeigeOracle_MinderES:"
+
totalcount
+
"条;"
+
DateTime
.
Now
.
ToString
()
+
" - lastId:"
+
lastId
+
",数据均以导入到MinderES中,请检查!"
);
}
catch
(
Exception
ex
)
{
LogService
.
WriteError
(
ex
.
ToString
());
}
}
}
}
10.Paul需求-JSON贝格ES导入本地ES/MinderES/TaikorES_MinderES/MatchCrawlID.csv
0 → 100644
View file @
745f30b6
This source diff could not be displayed because it is too large. You can
view the blob
instead.
10.Paul需求-JSON贝格ES导入本地ES/MinderES/TaikorES_MinderES/QuartzJobs/TaikorES.cs
0 → 100644
View file @
745f30b6
using
HTCommon.Data
;
using
HTCommon.Data
;
using
HTCommon.DataAccess
;
using
MinderESCommon
;
using
Nest
;
using
Quartz
;
using
System
;
using
System.Collections.Concurrent
;
using
System.Collections.Generic
;
using
System.IO
;
using
System.Linq
;
using
System.Text
;
using
System.Threading
;
using
System.Threading.Tasks
;
namespace
TaikorES_MinderES.QuartzJobs
{
public
class
TaikorES
:
IJob
{
static
Int32
Query_Count
=
100
;
public
void
Execute
(
IJobExecutionContext
context
)
{
int
totalcount
=
0
;
ConcurrentDictionary
<
long
,
string
>
MatchCrawlID
=
new
ConcurrentDictionary
<
long
,
string
>();
Int64
sn
=
0
;
try
{
//载入文件
string
filepath
=
Environment
.
CurrentDirectory
+
"\\MatchCrawlID.csv"
;
try
{
//读取文件
FileInfo
fi
=
new
FileInfo
(
filepath
);
if
(!
fi
.
Exists
)
{
LogService
.
WriteError
(
"此文件不存在:"
+
filepath
);
Console
.
WriteLine
(
"此文件不存在:"
+
filepath
);
return
;
}
StreamReader
sr
=
new
StreamReader
(
filepath
,
Encoding
.
Default
);
String
line
;
line
=
sr
.
ReadLine
();
//标题
if
(!
line
.
Contains
(
"CrawlID"
))
{
MatchCrawlID
.
TryAdd
(
Interlocked
.
Increment
(
ref
sn
),
line
.
ToString
().
Trim
().
Replace
(
"\r"
,
string
.
Empty
).
Replace
(
"\n"
,
string
.
Empty
));
}
while
((
line
=
sr
.
ReadLine
())
!=
null
)
{
MatchCrawlID
.
TryAdd
(
Interlocked
.
Increment
(
ref
sn
),
line
.
ToString
().
Trim
().
Replace
(
"\r"
,
string
.
Empty
).
Replace
(
"\n"
,
string
.
Empty
));
}
}
catch
(
Exception
ex
)
{
LogService
.
WriteError
(
filepath
+
"文件载入有问题"
+
ex
.
ToString
());
Console
.
WriteLine
(
filepath
+
"文件载入有问题"
+
ex
.
ToString
());
return
;
}
//查询范围:昨天的0点,到昨天的23点59分59秒。
DateTime
TodayQueryDate
=
DateTime
.
Parse
(
DateTime
.
Now
.
ToString
(
"yyyy-MM-dd"
)
+
" 0:0:0.000"
).
AddDays
(-
1
);
DateTime
TodayEndTime
=
DateTime
.
Parse
(
DateTime
.
Now
.
ToString
(
"yyyy-MM-dd"
)
+
" 23:59:59.999"
).
AddDays
(-
1
);
foreach
(
var
item
in
MatchCrawlID
)
{
GC
.
Collect
();
try
{
int
tryCount
=
0
;
string
todayitemID
=
""
;
//按照同一fetchTime时间、按照todayitemID号排序,则查询时也要注意大于上次的按照todayitemID号排序号。
string
crawlID
=
item
.
Value
.
ToString
();
DateTime
beginTime
=
TodayQueryDate
;
//goto
TagToday
:
GC
.
Collect
();
List
<
Item
>
TaiKorNewsList
=
null
;
try
{
//当前爬虫、指定时间范围内 的数据,且排除上一次的最后一条。
var
search
=
new
SearchDescriptor
<
Item
>();
search
.
Query
(
q
=>
q
.
Bool
(
b
=>
b
.
Must
(
m
=>
m
.
Term
(
t
=>
t
.
Field
(
tf
=>
tf
.
CrawlID
).
Value
(
crawlID
)),
m2
=>
m2
.
DateRange
(
r
=>
r
.
Field
(
f
=>
f
.
FetchTime
)
.
GreaterThanOrEquals
(
beginTime
)
.
LessThanOrEquals
(
TodayEndTime
)
.
TimeZone
(
"+08:00"
)
)
)
.
MustNot
(
mn
=>
mn
.
Match
(
nm
=>
nm
.
Field
(
f
=>
f
.
ItemID
).
Query
(
todayitemID
)))
)
)
.
From
(
0
)
//跳过的数据个数
.
Size
(
Query_Count
)
//返回数据个数
.
Sort
(
st
=>
st
.
Ascending
(
asc
=>
asc
.
FetchTime
).
Ascending
(
asc
=>
asc
.
ItemID
))
//排序
;
TaiKorNewsList
=
ESAccess
.
Search_SearchDetail
<
Item
>(
search
).
ToList
<
Item
>();
}
catch
(
Exception
ex
)
{
if
(
tryCount
<
3
)
{
tryCount
=
tryCount
+
1
;
goto
TagToday
;
}
//失败--重试3次
LogService
.
WriteError
(
"爬虫:"
+
item
.
Value
.
ToString
()
+
" - "
+
DateTime
.
Now
.
ToString
()
+
"日期查询ES时重试三次依然错误,无法连接或异常,请检查!"
+
ex
.
ToString
());
Console
.
WriteLine
(
"爬虫:"
+
item
.
Value
.
ToString
()
+
" - "
+
DateTime
.
Now
.
ToString
()
+
"日期查询ES时重试三次依然错误,无法连接或异常,请检查!"
+
ex
.
ToString
());
}
if
(
TaiKorNewsList
!=
null
&&
TaiKorNewsList
.
Count
>
0
)
{
totalcount
=
totalcount
+
TaiKorNewsList
.
Count
;
foreach
(
Item
iListtem
in
TaiKorNewsList
)
{
iListtem
.
Tag
=
"MinderESFromTaikorES"
;
}
MinderESAccess
.
BulkInsertAsync
<
Item
>(
TaiKorNewsList
.
ToArray
<
Item
>());
string
msg
=
string
.
Format
(
"Success sync TaikorES news at {0}, this time index {1} items."
,
DateTime
.
Now
,
TaiKorNewsList
.
Count
);
LogService
.
WriteInfo
(
msg
);
//Console.WriteLine(msg);
}
if
(
null
!=
TaiKorNewsList
&&
TaiKorNewsList
.
Count
==
Query_Count
)
{
//认为当天内还有其他记录,需继续执行。
tryCount
=
0
;
todayitemID
=
TaiKorNewsList
.
LastOrDefault
().
ItemID
.
ToString
();
//又一个坑,数据中存在大量统一0点0分插入的数据。此类数据要注意ID号。
beginTime
=
TaiKorNewsList
.
LastOrDefault
().
FetchTime
;
goto
TagToday
;
}
}
catch
(
Exception
ex
)
{
LogService
.
WriteError
(
"列表中第"
+
item
.
Key
.
ToString
()
+
"个爬虫:"
+
item
.
Value
.
ToString
()
+
",执行时错误!"
+
ex
.
ToString
()
+
"数据日期为:"
+
TodayEndTime
.
ToString
(
"yyyy-MM-dd"
));
Console
.
WriteLine
(
"列表中第"
+
item
.
Key
.
ToString
()
+
"个爬虫:"
+
item
.
Value
.
ToString
()
+
",执行时错误!"
+
ex
.
ToString
()
+
"数据日期为:"
+
TodayEndTime
.
ToString
(
"yyyy-MM-dd"
));
}
}
//失败--重试3次
LogService
.
WriteInfo
(
"完成:TaikorES_MinderES:"
+
totalcount
+
"条;"
+
TodayQueryDate
.
ToString
()
+
" - "
+
TodayEndTime
.
ToString
()
+
"4W爬虫抓取的所有数据均以导入到MinderES中,请检查!"
);
Console
.
WriteLine
(
"完成:TaikorES_MinderES:"
+
totalcount
+
"条;"
+
TodayQueryDate
.
ToString
()
+
" - "
+
TodayEndTime
.
ToString
()
+
"4W爬虫抓取的所有数据均以导入到MinderES中,请检查!"
);
}
catch
(
Exception
ex
)
{
LogService
.
WriteError
(
ex
.
ToString
());
Console
.
WriteLine
(
ex
.
ToString
());
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment