介绍
介绍
福哥今天会带着大家完成网页蜘蛛的代码的最后一个部分,实现网页源代码内容写入MySQL数据库功能,实现网页信息写入ElasticSearch搜索引擎功能。这部分功能实现了之后,我们就可以转而去编写搜索引擎的前端UI功能的代码了。
要实现写入ElasticSearch就需要先连接ElasticSearch,并且需要设置一个索引名称。这些我们在前面已经准备好了,所以我们现在就可以编写相应的业务逻辑了。
实现全文搜索需要对每一个网页设置一个权重值,这个权重值在搜索结果排序时候是非常重要的。通常情况下页面权重就是所属域名的权重,我们只需要根据一些算法更新这个权重即可。
Spider对象
属性
es
es = None
esIndex
esIndex = "tfse"
esDocType
esDocType = "all_type"
方法
open
def open(self): print("Start") try: self.mysqlCN = pymysql.connect("192.168.2.168", "tfse", "abcdef") self.mysqlCS = self.mysqlCN.cursor() self.mysqlCN.select_db("tfse") except Exception as e: print("Failed to connect db") print(e) exit() try: self.es = Elasticsearch(host="192.168.2.168", port=9200, timeout=60) except Exception as e: print("Failed to connect es") print(e) exit()
analyzePage
def analyzePage(self, url, weight): title = self.chrome.title keywords = "" description = "" charset = "" pageSource = self.chrome.page_source.encode("gbk", "ignore") pageSource = pageSource.decode("gbk") stRE = re.compile("<\/?[a-z0-9]+[^\>]*>", re.M | re.I) scriptRE = re.compile("<(script).*?>[\s\S]*?<\/\1>", re.M | re.I) bcRE = re.compile("[\s\t\r\n]+", re.M | re.I) pageText = self.chrome.page_source.encode("gbk", "ignore") pageText = re.sub(stRE, " ", pageText) pageText = re.sub(scriptRE, " ", pageText) pageText = re.sub(bcRE, " ", pageText) pageText = pageText.decode("gbk") metas = self.chrome.find_elements_by_tag_name("meta") for meta in metas: myName = meta.get_attribute("name") myContent = meta.get_attribute("content") myHttpEquiv = meta.get_attribute("http-equiv") if myName == "keywords": keywords = myContent elif myName == "description": description = myContent elif myHttpEquiv is not None and myHttpEquiv.lower() == "content-type": myCharset = myContent csRE = re.compile("charset\=([^\;]+)", re.M | re.I) mats = csRE.search(myCharset) if mats: charset=mats.group(1).lower() return { "url": url, "title": title, "keywords": keywords, "description": description, "charset": charset, "pageSource": pageSource, "pageText": pageText, "weight": weight, "statusCode": 200 }
updateWebPage
def updateWebPage(self, webpageId, args): try: # update webpages print("更新网页信息: " + args["url"]) self.mysqlCS.execute("UPDATE webpages SET title = %s, keywords = %s, description = %s WHERE webpageId = %s", [args["title"].encode("utf-8"), args["keywords"].encode("utf-8"), args["description"].encode("utf-8"), webpageId]) self.mysqlCN.commit() # update webpage_data self.mysqlCS.execute("SELECT * FROM webpage_data WHERE webpageId = %s", [webpageId]) row = self.mysqlCS.fetchone() if row is None: print("建立网页数据: " + args["url"]) self.mysqlCS.execute("INSERT INTO webpage_data (webpageId, sourceCode, pageText) VALUES (%s, %s, %s)", [webpageId, args["pageSource"].encode("utf-8"), args["pageText"].encode("utf-8")]) self.mysqlCN.commit() else: print("更新网页数据: " + args["url"]) self.mysqlCS.execute("UPDATE webpage_data SET sourceCode = %s, pageText = %s WHERE webpageId = %s", [args["pageSource"].encode("utf-8"), args["pageText"].encode("utf-8"), webpageId]) self.mysqlCN.commit() # update es try: query = {"query":{"term":{"webpageid":webpageId}}} results = self.es.search(index=self.esIndex,doc_type=self.esDocType,body=query) except Exception as e: results = None if results is not None and results['hits']['total'] == 1: for result in results['hits']['hits']: myId = result['_id'] doc = { "webpageid": webpageId, "title": args["title"].encode("utf-8"), "keywords": args["keywords"].encode("utf-8"), "description": args["description"].encode("utf-8"), "pageText": args["pageText"].encode("utf-8"), "weight": args["weight"], "statusCode": args["statusCode"] } data = {"doc":doc} print("建立网页索引: " + args["url"]) self.es.update(index=self.esIndex,doc_type=self.esDocType,id=myId,body=data) break else: data = { "webpageid": webpageId, "title": args["title"].encode("utf-8"), "keywords": args["keywords"].encode("utf-8"), "description": args["description"].encode("utf-8"), "pageText": args["pageText"].encode("utf-8"), "weight": args["weight"], "statusCode": args["statusCode"] } print("更新网页索引: " + args["url"]) self.es.index(index=self.esIndex,doc_type=self.esDocType,body=data) return webpageId except Exception as e: print("Failed to updateWebPage") print(e) return None
updateDomainDT
def updateDomainDT(self, websiteId, domainName): try: now = datetime.datetime.now() delta = datetime.timedelta(days=30) nextFetchDT = now + delta print("更新域名时间: " + domainName) self.mysqlCS.execute("UPDATE websites SET lastFetchDT = now(), nextFetchDT = %s WHERE websiteId = %s", [nextFetchDT, websiteId]) self.mysqlCN.commit() except Exception as e: doNothing = e
updateWebDT
def updateWebDT(self, webpageId, url): try: print("更新网页时间: " + url) self.mysqlCS.execute("UPDATE webpages SET lastFetchDT = now() WHERE websiteId = %s", [webpageId]) self.mysqlCN.commit() except Exception as e: doNothing = e
主程序
from lib.Spider import * mySpider = Spider() mySpider.open() domains = mySpider.getPendingDomains(10) if domains is not None: for domain in domains: mySpider.fetchDomainURL(domain[0], domain[1]) pages = mySpider.getPendingPages(domain[1], 100) if pages is not None: for page in pages: mySpider.fetchDomainPageURL(domain[0], domain[1], page[0], page[2], domain[3]) mySpider.updateWebDT(page[0], page[2]) mySpider.updateDomainDT(domain[0], domain[1]) mySpider.close()
讲解
属性es
操作elasticsearch搜索引擎的句柄
属性esIndex
我们设定的elasticsearch的索引名称
属性esDocType
我们设置的elasticsearch的文档类型
open
增加了连接elasticsearch搜索引擎的处理
analyzePage
增加了weight数据和statusCode数据
updateWebPage
增加了将网页源代码和纯文字内容写入webpage_data数据表的处理
updateDomainDT
根据weight权重值更新域名的nextFetchDT的时间
更新域名的lastFetchDT的时间
updateWebDT
更新网页的lastFetchDT的时间
总结
现在网页蜘蛛的基本功能已经完全实现了,我们只要保证网页蜘蛛程序不间断的反复执行就可以实现爬取互联网网站页面的目的了,系统会根据lastFetchDT和nextFetchDT保证系统当中的域名会被相对平均的分配到采集的机会了。
福哥再次声明一下,为了给大家提供一个动脑筋的机会,福哥不会提供完整的Spider对象的代码。不过,大家如果编写代码的过程中遇到了问题,可以关注“同福编程”公众号,在里面给福哥留言,福哥会在近期开通一个“站内消息”功能,大家可以给“鬼谷子叔叔”留言。
P.S.
微信公众号的文章发出去之后是不能编辑的,但是福哥偶尔会修复一些描述不到位、示例不正确、结构不清晰等等的文章错误,这些只能在网站上才能看到最新版本内容,望大家知晓~~