Ultimate Spider【终极蜘蛛王】,一款用于网络数据采集的工具。本文对Ultimate Spider的整体架构和一些技术亮点作说明。

Ultimate Spider主要页面有:1、用来展示概览信息的仪表盘页;2、爬虫配置页。

仪表盘页

爬虫配置页

整体架构如下图所示:

整体架构

爬虫配置

在WebMagic爬虫框架的基础上对初始Url和爬取内容进行可视化配置,爬取规则的制定更加灵活多变,适应范围更广。

  • SpiderProcessor.java 根据爬取规则进行爬取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@Slf4j
@EnableConfigurationProperties({SpiderProperties.class})
public class SpiderProcessor implements PageProcessor {

private SpiderProperties spiderProperties;

private String spiderName;

private Crawler crawler;

public SpiderProcessor(SpiderProperties spiderProperties, String spiderName, Crawler crawler) {
this.spiderProperties = spiderProperties;
this.spiderName = spiderName;
this.crawler = crawler;
}

@Override
public void process(Page page) {
if (CollectionUtils.isNotEmpty(crawler.getCrawlConfigs())) {
page.putField(Constants.SPIDER_NAME, spiderName);
JSONObject content = new JSONObject();
for (Crawler.CrawlConfig crawlConfig : crawler.getCrawlConfigs()) {
if (CollectionUtils.isNotEmpty(crawlConfig.getCrawlRules())) {
Selectable selector = page.getHtml();
for (Crawler.CrawlRule crawlRule : crawlConfig.getCrawlRules()) {
switch (crawlRule.getCrawlType()) {
case XPATH:
selector = selector.xpath(crawlRule.getRule());
break;
case CSS:
if (StringUtils.isBlank(crawlRule.getAttr())) {
selector = selector.css(crawlRule.getRule());
} else {
selector = selector.css(crawlRule.getRule(), crawlRule.getAttr());
}
break;
case LINKS:
selector = selector.links();
break;
case REGEX:
selector = selector.regex(crawlRule.getRule());
break;
case REGEX_WITH_GROUP:
selector = selector.regex(crawlRule.getRule(), crawlRule.getGroup());
break;
case REPLACE:
selector = selector.replace(crawlRule.getRule(), crawlRule.getReplacement());
break;
default:
log.warn("not support crawl rule type: {}", crawlRule.getCrawlType());
}
}
if (crawlConfig.isMultiResult()) {
List<String> value = selector.all();
if (crawlConfig.isNullSkip() && CollectionUtils.isEmpty(value)) {
page.setSkip(true);
break;
}
if (crawlConfig.getCrawlResultType() == Crawler.CrawlResultType.TARGET_URL) {
page.addTargetRequests(value);
} else if (crawlConfig.getCrawlResultType() == Crawler.CrawlResultType.TEXT) {
content.put(crawlConfig.getCrawlKey(), value);
} else {
log.warn("not support crawl result type: {}", crawlConfig.getCrawlResultType());
}
} else {
String value = selector.get();
if (crawlConfig.isNullSkip() && StringUtils.isBlank(value)) {
page.setSkip(true);
break;
}
if (crawlConfig.getCrawlResultType() == Crawler.CrawlResultType.TARGET_URL) {
page.addTargetRequest(value);
} else if (crawlConfig.getCrawlResultType() == Crawler.CrawlResultType.TEXT) {
content.put(crawlConfig.getCrawlKey(), value);
} else {
log.warn("not support crawl result type: {}", crawlConfig.getCrawlResultType());
}
}
}
}
content.put(Constants.CRAWL_AT, new Date());
page.putField(Constants.SPIDER_CONTENT, content);
}
}

@Override
public Site getSite() {
return Site.me()
.setRetryTimes(spiderProperties.getRetryTimes())
.setRetrySleepTime(spiderProperties.getRetrySleepTime())
.setSleepTime(spiderProperties.getSleepTime())
.setTimeOut(spiderProperties.getTimeout());
}
}

爬虫执行

使用Quartz定时任务框架完成爬虫任务的调度,可以随时暂停和恢复爬虫任务。

  • TaskServiceImpl.java 爬虫任务服务实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@Slf4j
@Service
public class TaskServiceImpl implements TaskService {

@Autowired
private Scheduler scheduler;

@Autowired
private TaskRepository taskRepository;

@Autowired
private UltimateSpiderRepository ultimateSpiderRepository;

@Override
public Result getSpiderTask(Integer spiderId) {
if (!ValidateUtils.validId(spiderId)) {
return Result.fail(ResultCode.PARAMS_ERROR);
}
Task crawlTask = taskRepository.findOneBySpiderIdAndTaskType(spiderId, Task.TaskType.CRAWL);
Task cleanTask = taskRepository.findOneBySpiderIdAndTaskType(spiderId, Task.TaskType.CLEAN);
JSONObject spiderTask = new JSONObject();
spiderTask.fluentPut(Constants.CRAWL_TASK, crawlTask).fluentPut(Constants.CLEAN_TASK, cleanTask);
return Result.ok(spiderTask);
}

@Transactional(rollbackFor = Exception.class)
@Override
public Result saveTask(Task task) throws SchedulerException {
if (!task.isValid(false)) {
return Result.fail(ResultCode.PARAMS_ERROR);
}
if (task.getJobStatus() == null) {
task.setJobStatus(Task.JobStatus.RUNNING);
}
Task flushedTask = taskRepository.saveAndFlush(task);
UltimateSpider spider = ultimateSpiderRepository.findOne(task.getSpiderId());
JobKey jobKey = JobUtils.generateJobKey(spider, task.getTaskType());
if (task.getJobStatus() == Task.JobStatus.RUNNING) {
TriggerKey triggerKey = JobUtils.generateTriggerKey(spider, task.getTaskType());
Trigger trigger = TriggerBuilder.newTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule(task.getCronExpression()))
.withIdentity(triggerKey)
.build();
if (scheduler.checkExists(jobKey)) {
scheduler.rescheduleJob(triggerKey, trigger);
} else {
//noinspection unchecked
JobDetail jobDetail = JobBuilder.newJob(task.getTaskType().getJobClass())
.withIdentity(jobKey)
.usingJobData(Constants.JOB_TASK_ID, flushedTask.getId())
.storeDurably()
.build();
scheduler.scheduleJob(jobDetail, trigger);
}
} else if (task.getJobStatus() == Task.JobStatus.PAUSE) {
if (scheduler.checkExists(jobKey)) {
scheduler.pauseJob(jobKey);
}
}
return Result.ok(flushedTask);
}

@Transactional(rollbackFor = Exception.class)
@Override
public Result pauseTask(Integer id) throws SchedulerException {
if (!ValidateUtils.validId(id)) {
return Result.fail(ResultCode.PARAMS_ERROR);
}
Task task = taskRepository.findOne(id);
if (task == null || !task.isValid(false)) {
return Result.fail(ResultCode.DATA_VALID_ERROR);
}
UltimateSpider spider = ultimateSpiderRepository.findOne(task.getSpiderId());
JobKey jobKey = JobUtils.generateJobKey(spider, task.getTaskType());
if (scheduler.checkExists(jobKey)) {
scheduler.pauseJob(jobKey);
}
task.setJobStatus(Task.JobStatus.PAUSE);
taskRepository.save(task);
return Result.ok();
}

@Override
public Result resumeTask(Integer id) throws SchedulerException {
if (!ValidateUtils.validId(id)) {
return Result.fail(ResultCode.PARAMS_ERROR);
}
Task task = taskRepository.findOne(id);
if (task == null || !task.isValid(false)) {
return Result.fail(ResultCode.DATA_VALID_ERROR);
}
resumeTask(task);
task.setJobStatus(Task.JobStatus.RUNNING);
taskRepository.save(task);
return Result.ok();
}

@Override
public void resumeTask(Task task) throws SchedulerException {
UltimateSpider spider = ultimateSpiderRepository.findOne(task.getSpiderId());
JobKey jobKey = JobUtils.generateJobKey(spider, task.getTaskType());
if (scheduler.checkExists(jobKey)) {
scheduler.resumeJob(jobKey);
} else {
//noinspection unchecked
JobDetail jobDetail = JobBuilder.newJob(task.getTaskType().getJobClass())
.withIdentity(jobKey)
.usingJobData(Constants.JOB_TASK_ID, task.getId())
.storeDurably()
.build();
TriggerKey triggerKey = JobUtils.generateTriggerKey(spider, task.getTaskType());
Trigger trigger = TriggerBuilder.newTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule(task.getCronExpression()))
.withIdentity(triggerKey)
.build();
scheduler.scheduleJob(jobDetail, trigger);
}
}
}

数据清洗

数据清洗任务同样使用Quartz进行调度,确保爬取数据的质量。清洗规则同样可在页面上进行配置。

  • CleanerJob.java 清洗任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@Slf4j
public class CleanerJob implements Job {

@Autowired
private UltimateSpiderRepository ultimateSpiderRepository;

@Autowired
private TaskRepository taskRepository;

@Autowired
private CleanerRepository cleanerRepository;

@Autowired
private MongoTemplate mongoTemplate;

@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
Integer taskId = (Integer) jobExecutionContext.getJobDetail().getJobDataMap().get(Constants.JOB_TASK_ID);
Task task = taskRepository.findOne(taskId);
UltimateSpider ultimateSpider = ultimateSpiderRepository.findOne(task.getSpiderId());
Cleaner cleaner = cleanerRepository.findOneBySpiderId(task.getSpiderId());
if (cleaner == null || !cleaner.isValid(true)) {
log.warn("[{}] cleaner is invalid", ultimateSpider.getSpiderName());
return;
}
log.info("[{}] cleaner start", ultimateSpider.getSpiderName());
List<JSONObject> purgeList = Lists.newArrayList();
List<JSONObject> mergeList = Lists.newArrayList();
for (Cleaner.CleanerRule cleanerRule : cleaner.getCleanerRules()) {
if (cleanerRule.getCleanType() == Cleaner.CleanType.MERGE) {
List<JSONObject> queryResult = mongoTemplate.find(new BasicQuery(cleanerRule.getQueryRule()),
JSONObject.class, ultimateSpider.getSpiderName());
Map<String, List<JSONObject>> mergeMap = Maps.newHashMap();
queryResult.forEach(result -> {
List<String> keyItems = Lists.newLinkedList();
for (String distinctKey : cleanerRule.getDistinctKeys()) {
StringBuilder keyItemBuilder = new StringBuilder();
if (!result.containsKey(distinctKey)) {
break;
}
keyItemBuilder.append(distinctKey).append(Constants.MERGE_KEY_DELIMITER).append(result.get(distinctKey));
keyItems.add(keyItemBuilder.toString());
}
if (keyItems.size() == cleanerRule.getDistinctKeys().size()) {
String mergeKey = StringUtils.join(Constants.MERGE_VALUE_DELIMITER, keyItems);
List<JSONObject> resultMergeList = mergeMap.getOrDefault(mergeKey, Lists.newArrayList());
resultMergeList.add(result);
mergeMap.putIfAbsent(mergeKey, Lists.newArrayList());
}
});
mergeMap.forEach((mergeKey, resultMergeList) -> {
if (resultMergeList.size() > 1) {
purgeList.addAll(resultMergeList);
JSONObject mergeResult = resultMergeList.get(0);
resultMergeList.remove(0);
resultMergeList.forEach(result -> {
for (String resultMergeKey : cleanerRule.getMergeKeys()) {
if (mergeResult.get(resultMergeKey) instanceof JSONArray) {
JSONArray jsonArray = (JSONArray) mergeResult.getOrDefault(resultMergeKey, new JSONArray());
if (!jsonArray.contains(result.get(resultMergeKey))) {
jsonArray.add(result.get(resultMergeKey));
mergeResult.put(resultMergeKey, jsonArray);
}
} else {
JSONArray jsonArray = new JSONArray();
jsonArray.add(mergeResult.get(resultMergeKey));
jsonArray.add(result.get(resultMergeKey));
mergeResult.put(resultMergeKey, jsonArray);
}
}
});
mergeList.add(mergeResult);
}
});
} else if (cleanerRule.getCleanType() == Cleaner.CleanType.PURGE) {
purgeList.addAll(mongoTemplate.find(new BasicQuery(cleanerRule.getQueryRule()),
JSONObject.class, ultimateSpider.getSpiderName()));
}
}
purgeList.forEach(purgeObject -> mongoTemplate.remove(purgeObject, ultimateSpider.getSpiderName()));
mergeList.forEach(mergeObject -> mongoTemplate.save(mergeObject, ultimateSpider.getSpiderName()));
}
}

数据持久化

使用RabbitMQ消息队列异步保存爬取结果至MongoDB文档型数据库,性能更好,更适合格式多变的爬取结果的存储。

  • 消息消费者存储爬取结果至MongoDB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@Component
@RabbitListener(queues = Constants.QUEUE_NAME)
public class Receiver {

@Autowired
private MongoTemplate mongoTemplate;

@RabbitHandler
public void receive(String spiderResultJson) {
SpiderResult spiderResult = JSON.parseObject(spiderResultJson, SpiderResult.class);
log.info("queue: {}, spider_result: {}", Constants.QUEUE_NAME, spiderResult);
mongoTemplate.save(spiderResult.getContent(), spiderResult.getSpiderName());
}
}

数据可视化

使用较为成熟的Metabase进行采集数据的可视化。

Metabase采集数据可视化