Ultimate Spider【终极蜘蛛王】,一款用于网络数据采集的工具。本文对Ultimate Spider的整体架构和一些技术亮点作说明。
Ultimate Spider主要页面有:1、用来展示概览信息的仪表盘页;2、爬虫配置页。
整体架构如下图所示:
爬虫配置
在WebMagic爬虫框架的基础上对初始Url和爬取内容进行可视化配置,爬取规则的制定更加灵活多变,适应范围更广。
- SpiderProcessor.java 根据爬取规则进行爬取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| @Slf4j @EnableConfigurationProperties({SpiderProperties.class}) public class SpiderProcessor implements PageProcessor {
private SpiderProperties spiderProperties;
private String spiderName;
private Crawler crawler;
public SpiderProcessor(SpiderProperties spiderProperties, String spiderName, Crawler crawler) { this.spiderProperties = spiderProperties; this.spiderName = spiderName; this.crawler = crawler; }
@Override public void process(Page page) { if (CollectionUtils.isNotEmpty(crawler.getCrawlConfigs())) { page.putField(Constants.SPIDER_NAME, spiderName); JSONObject content = new JSONObject(); for (Crawler.CrawlConfig crawlConfig : crawler.getCrawlConfigs()) { if (CollectionUtils.isNotEmpty(crawlConfig.getCrawlRules())) { Selectable selector = page.getHtml(); for (Crawler.CrawlRule crawlRule : crawlConfig.getCrawlRules()) { switch (crawlRule.getCrawlType()) { case XPATH: selector = selector.xpath(crawlRule.getRule()); break; case CSS: if (StringUtils.isBlank(crawlRule.getAttr())) { selector = selector.css(crawlRule.getRule()); } else { selector = selector.css(crawlRule.getRule(), crawlRule.getAttr()); } break; case LINKS: selector = selector.links(); break; case REGEX: selector = selector.regex(crawlRule.getRule()); break; case REGEX_WITH_GROUP: selector = selector.regex(crawlRule.getRule(), crawlRule.getGroup()); break; case REPLACE: selector = selector.replace(crawlRule.getRule(), crawlRule.getReplacement()); break; default: log.warn("not support crawl rule type: {}", crawlRule.getCrawlType()); } } if (crawlConfig.isMultiResult()) { List<String> value = selector.all(); if (crawlConfig.isNullSkip() && CollectionUtils.isEmpty(value)) { page.setSkip(true); break; } if (crawlConfig.getCrawlResultType() == Crawler.CrawlResultType.TARGET_URL) { page.addTargetRequests(value); } else if (crawlConfig.getCrawlResultType() == Crawler.CrawlResultType.TEXT) { content.put(crawlConfig.getCrawlKey(), value); } else { log.warn("not support crawl result type: {}", crawlConfig.getCrawlResultType()); } } else { String value = selector.get(); if (crawlConfig.isNullSkip() && StringUtils.isBlank(value)) { page.setSkip(true); break; } if (crawlConfig.getCrawlResultType() == Crawler.CrawlResultType.TARGET_URL) { page.addTargetRequest(value); } else if (crawlConfig.getCrawlResultType() == Crawler.CrawlResultType.TEXT) { content.put(crawlConfig.getCrawlKey(), value); } else { log.warn("not support crawl result type: {}", crawlConfig.getCrawlResultType()); } } } } content.put(Constants.CRAWL_AT, new Date()); page.putField(Constants.SPIDER_CONTENT, content); } }
@Override public Site getSite() { return Site.me() .setRetryTimes(spiderProperties.getRetryTimes()) .setRetrySleepTime(spiderProperties.getRetrySleepTime()) .setSleepTime(spiderProperties.getSleepTime()) .setTimeOut(spiderProperties.getTimeout()); } }
|
爬虫执行
使用Quartz定时任务框架完成爬虫任务的调度,可以随时暂停和恢复爬虫任务。
- TaskServiceImpl.java 爬虫任务服务实现类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| @Slf4j @Service public class TaskServiceImpl implements TaskService {
@Autowired private Scheduler scheduler;
@Autowired private TaskRepository taskRepository;
@Autowired private UltimateSpiderRepository ultimateSpiderRepository;
@Override public Result getSpiderTask(Integer spiderId) { if (!ValidateUtils.validId(spiderId)) { return Result.fail(ResultCode.PARAMS_ERROR); } Task crawlTask = taskRepository.findOneBySpiderIdAndTaskType(spiderId, Task.TaskType.CRAWL); Task cleanTask = taskRepository.findOneBySpiderIdAndTaskType(spiderId, Task.TaskType.CLEAN); JSONObject spiderTask = new JSONObject(); spiderTask.fluentPut(Constants.CRAWL_TASK, crawlTask).fluentPut(Constants.CLEAN_TASK, cleanTask); return Result.ok(spiderTask); }
@Transactional(rollbackFor = Exception.class) @Override public Result saveTask(Task task) throws SchedulerException { if (!task.isValid(false)) { return Result.fail(ResultCode.PARAMS_ERROR); } if (task.getJobStatus() == null) { task.setJobStatus(Task.JobStatus.RUNNING); } Task flushedTask = taskRepository.saveAndFlush(task); UltimateSpider spider = ultimateSpiderRepository.findOne(task.getSpiderId()); JobKey jobKey = JobUtils.generateJobKey(spider, task.getTaskType()); if (task.getJobStatus() == Task.JobStatus.RUNNING) { TriggerKey triggerKey = JobUtils.generateTriggerKey(spider, task.getTaskType()); Trigger trigger = TriggerBuilder.newTrigger() .withSchedule(CronScheduleBuilder.cronSchedule(task.getCronExpression())) .withIdentity(triggerKey) .build(); if (scheduler.checkExists(jobKey)) { scheduler.rescheduleJob(triggerKey, trigger); } else { JobDetail jobDetail = JobBuilder.newJob(task.getTaskType().getJobClass()) .withIdentity(jobKey) .usingJobData(Constants.JOB_TASK_ID, flushedTask.getId()) .storeDurably() .build(); scheduler.scheduleJob(jobDetail, trigger); } } else if (task.getJobStatus() == Task.JobStatus.PAUSE) { if (scheduler.checkExists(jobKey)) { scheduler.pauseJob(jobKey); } } return Result.ok(flushedTask); }
@Transactional(rollbackFor = Exception.class) @Override public Result pauseTask(Integer id) throws SchedulerException { if (!ValidateUtils.validId(id)) { return Result.fail(ResultCode.PARAMS_ERROR); } Task task = taskRepository.findOne(id); if (task == null || !task.isValid(false)) { return Result.fail(ResultCode.DATA_VALID_ERROR); } UltimateSpider spider = ultimateSpiderRepository.findOne(task.getSpiderId()); JobKey jobKey = JobUtils.generateJobKey(spider, task.getTaskType()); if (scheduler.checkExists(jobKey)) { scheduler.pauseJob(jobKey); } task.setJobStatus(Task.JobStatus.PAUSE); taskRepository.save(task); return Result.ok(); }
@Override public Result resumeTask(Integer id) throws SchedulerException { if (!ValidateUtils.validId(id)) { return Result.fail(ResultCode.PARAMS_ERROR); } Task task = taskRepository.findOne(id); if (task == null || !task.isValid(false)) { return Result.fail(ResultCode.DATA_VALID_ERROR); } resumeTask(task); task.setJobStatus(Task.JobStatus.RUNNING); taskRepository.save(task); return Result.ok(); }
@Override public void resumeTask(Task task) throws SchedulerException { UltimateSpider spider = ultimateSpiderRepository.findOne(task.getSpiderId()); JobKey jobKey = JobUtils.generateJobKey(spider, task.getTaskType()); if (scheduler.checkExists(jobKey)) { scheduler.resumeJob(jobKey); } else { JobDetail jobDetail = JobBuilder.newJob(task.getTaskType().getJobClass()) .withIdentity(jobKey) .usingJobData(Constants.JOB_TASK_ID, task.getId()) .storeDurably() .build(); TriggerKey triggerKey = JobUtils.generateTriggerKey(spider, task.getTaskType()); Trigger trigger = TriggerBuilder.newTrigger() .withSchedule(CronScheduleBuilder.cronSchedule(task.getCronExpression())) .withIdentity(triggerKey) .build(); scheduler.scheduleJob(jobDetail, trigger); } } }
|
数据清洗
数据清洗任务同样使用Quartz进行调度,确保爬取数据的质量。清洗规则同样可在页面上进行配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| @Slf4j public class CleanerJob implements Job {
@Autowired private UltimateSpiderRepository ultimateSpiderRepository;
@Autowired private TaskRepository taskRepository;
@Autowired private CleanerRepository cleanerRepository;
@Autowired private MongoTemplate mongoTemplate;
@Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { Integer taskId = (Integer) jobExecutionContext.getJobDetail().getJobDataMap().get(Constants.JOB_TASK_ID); Task task = taskRepository.findOne(taskId); UltimateSpider ultimateSpider = ultimateSpiderRepository.findOne(task.getSpiderId()); Cleaner cleaner = cleanerRepository.findOneBySpiderId(task.getSpiderId()); if (cleaner == null || !cleaner.isValid(true)) { log.warn("[{}] cleaner is invalid", ultimateSpider.getSpiderName()); return; } log.info("[{}] cleaner start", ultimateSpider.getSpiderName()); List<JSONObject> purgeList = Lists.newArrayList(); List<JSONObject> mergeList = Lists.newArrayList(); for (Cleaner.CleanerRule cleanerRule : cleaner.getCleanerRules()) { if (cleanerRule.getCleanType() == Cleaner.CleanType.MERGE) { List<JSONObject> queryResult = mongoTemplate.find(new BasicQuery(cleanerRule.getQueryRule()), JSONObject.class, ultimateSpider.getSpiderName()); Map<String, List<JSONObject>> mergeMap = Maps.newHashMap(); queryResult.forEach(result -> { List<String> keyItems = Lists.newLinkedList(); for (String distinctKey : cleanerRule.getDistinctKeys()) { StringBuilder keyItemBuilder = new StringBuilder(); if (!result.containsKey(distinctKey)) { break; } keyItemBuilder.append(distinctKey).append(Constants.MERGE_KEY_DELIMITER).append(result.get(distinctKey)); keyItems.add(keyItemBuilder.toString()); } if (keyItems.size() == cleanerRule.getDistinctKeys().size()) { String mergeKey = StringUtils.join(Constants.MERGE_VALUE_DELIMITER, keyItems); List<JSONObject> resultMergeList = mergeMap.getOrDefault(mergeKey, Lists.newArrayList()); resultMergeList.add(result); mergeMap.putIfAbsent(mergeKey, Lists.newArrayList()); } }); mergeMap.forEach((mergeKey, resultMergeList) -> { if (resultMergeList.size() > 1) { purgeList.addAll(resultMergeList); JSONObject mergeResult = resultMergeList.get(0); resultMergeList.remove(0); resultMergeList.forEach(result -> { for (String resultMergeKey : cleanerRule.getMergeKeys()) { if (mergeResult.get(resultMergeKey) instanceof JSONArray) { JSONArray jsonArray = (JSONArray) mergeResult.getOrDefault(resultMergeKey, new JSONArray()); if (!jsonArray.contains(result.get(resultMergeKey))) { jsonArray.add(result.get(resultMergeKey)); mergeResult.put(resultMergeKey, jsonArray); } } else { JSONArray jsonArray = new JSONArray(); jsonArray.add(mergeResult.get(resultMergeKey)); jsonArray.add(result.get(resultMergeKey)); mergeResult.put(resultMergeKey, jsonArray); } } }); mergeList.add(mergeResult); } }); } else if (cleanerRule.getCleanType() == Cleaner.CleanType.PURGE) { purgeList.addAll(mongoTemplate.find(new BasicQuery(cleanerRule.getQueryRule()), JSONObject.class, ultimateSpider.getSpiderName())); } } purgeList.forEach(purgeObject -> mongoTemplate.remove(purgeObject, ultimateSpider.getSpiderName())); mergeList.forEach(mergeObject -> mongoTemplate.save(mergeObject, ultimateSpider.getSpiderName())); } }
|
数据持久化
使用RabbitMQ消息队列异步保存爬取结果至MongoDB文档型数据库,性能更好,更适合格式多变的爬取结果的存储。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Slf4j @Component @RabbitListener(queues = Constants.QUEUE_NAME) public class Receiver {
@Autowired private MongoTemplate mongoTemplate;
@RabbitHandler public void receive(String spiderResultJson) { SpiderResult spiderResult = JSON.parseObject(spiderResultJson, SpiderResult.class); log.info("queue: {}, spider_result: {}", Constants.QUEUE_NAME, spiderResult); mongoTemplate.save(spiderResult.getContent(), spiderResult.getSpiderName()); } }
|
数据可视化
使用较为成熟的Metabase进行采集数据的可视化。