在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。
在本篇文章中,我們將以 Candidate Generation 階段常用的方法之一:Content-based recommendation 基於內容的推薦為例,利用 Elasticsearch 的 More Like This query 建立一個 GitHub repositories 的推薦系統,以用戶最近打星過的 repo 作為輸入數據,比對出相似的其他 repo 作為候選物品集。
題外話,我原本是打算用 Spark 把 repo 的文本資料轉成 Word2Vec 向量,然後事先計算好各個 repo 之間的相似度(所謂的 Similarity Join),但是要計算這麼多 repo 之間的相似度實在太耗時間和機器了,就算用了 DIMSUM 和 Locality Sensitive Hashing (LSH) 的 Approximate Nearest Neighbor Search 的效果也不是很好。後來一想,尋找相似或相關物品這件事不就是搜尋引擎在做的嗎,所以直接把 repo 的各種資料丟進 Elasticsearch,用 document id 當作搜尋條件,一個 More Like This query 就解決了,爽快。畢竟不需要所有的事情都在 Spark 裡解決嘛。
完整的程式碼可以在 https://github.com/vinta/albedo 找到。
系列文章:
- Build a recommender system with Spark: Implicit ALS
- Build a recommender system with Spark: Content-based and Elasticsearch
- Build a recommender system with Spark: Logistic Regression
- Feature Engineering 特徵工程中常見的方法
- Spark ML cookbook (Scala)
- Spark SQL cookbook (Scala)
- 不定期更新中
Setup Elasticsearch
為了讓事情簡單一點,我們直接用官方包裝好的 Docker image。另外要注意的是,Elasticsearch 5.x/6.x 跟之前的版本比起來有不小的改動,例如 X-Pack、high-level REST client 和以後每個 index 只能有一個 mapping type 等等,建議大家有空可以翻一下文件。
# in elasticsearch.yml
bootstrap.memory_lock: true
cluster.name: albedo
discovery.type: single-node
http.host: 0.0.0.0
node.name: ${HOSTNAME}
xpack.security.enabled: false
# in docker-compose.yml
version: "3"
services:
django:
build: .
hostname: django
working_dir: /app
env_file: .docker-assets/django.env
command: .docker-assets/django_start.sh
ports:
- 8000:8000
volumes:
- ".:/app"
- "../albedo-vendors/bin:/usr/local/bin"
- "../albedo-vendors/dist-packages:/usr/local/lib/python3.5/dist-packages"
links:
- mysql
- elasticsearch
mysql:
image: vinta/mysql:5.7
hostname: mysql
env_file: .docker-assets/mysql.env
command: mysqld --character-set-server=utf8 --collation-server=utf8_unicode_ci
ports:
- 3306:3306
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:5.6.2
ports:
- 9200:9200
- 9300:9300
volumes:
- "./.docker-assets/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml"
environment:
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
$ docker-compose up
然後就可以在 http://127.0.0.1:9200/ 存取你的 Elasticsearch cluster 了。
ref:
https://www.elastic.co/guide/en/elasticsearch/reference/5.6/docker.html
https://www.elastic.co/guide/en/elasticsearch/reference/5.6/security-settings.html
Define the Mapping (Data Schema)
這裡用 elasticsearch-dsl-py
定義了一個 index 和 mapping type。
from elasticsearch.helpers import bulk
from elasticsearch_dsl import analyzer
from elasticsearch_dsl import Date, Integer, Keyword, Text, Boolean
from elasticsearch_dsl import Index, DocType
from elasticsearch_dsl.connections import connections
client = connections.create_connection(hosts=['elasticsearch'])
repo_index = Index('repo')
repo_index.settings(
number_of_shards=1,
number_of_replicas=0
)
text_analyzer = analyzer(
'text_analyzer',
char_filter=["html_strip"],
tokenizer="standard",
filter=["asciifolding", "lowercase", "snowball", "stop"]
)
repo_index.analyzer(text_analyzer)
@repo_index.doc_type
class RepoInfoDoc(DocType):
owner_id = Keyword()
owner_username = Keyword()
owner_type = Keyword()
name = Text(text_analyzer, fields={'raw': Keyword()})
full_name = Text(text_analyzer, fields={'raw': Keyword()})
description = Text(text_analyzer)
language = Keyword()
created_at = Date()
updated_at = Date()
pushed_at = Date()
homepage = Keyword()
size = Integer()
stargazers_count = Integer()
forks_count = Integer()
subscribers_count = Integer()
fork = Boolean()
has_issues = Boolean()
has_projects = Boolean()
has_downloads = Boolean()
has_wiki = Boolean()
has_pages = Boolean()
open_issues_count = Integer()
topics = Keyword(multi=True)
class Meta:
index = repo_index._name
@classmethod
def bulk_save(cls, documents):
dicts = (d.to_dict(include_meta=True) for d in documents)
return bulk(client, dicts)
def save(self, **kwargs):
return super(RepoInfoDoc, self).save(**kwargs)
RepoInfoDoc.init()
Elasticsearch: More than a Search Engine
https://vinta.ws/code/elasticsearch-more-than-a-search-engine.html
ref:
https://github.com/elastic/elasticsearch-dsl-py
Import Data into Elasticsearch
你可以透過很多種手段把存在 MySQL 裡的資料倒進 Elasticsearch,例如 cronjob、Celery 或 MySQL binglog replication,不過因為我們主要的 data models 是用 Django ORM 寫的,這裡就簡單地寫個 Django command 把資料倒進去就好。
from django.core.management.base import BaseCommand
from app.mappings import RepoInfoDoc
from app.models import RepoInfo
class Command(BaseCommand):
def handle(self, *args, **options):
def batch_qs(qs, batch_size=500):
total = qs.count()
for start in range(0, total, batch_size):
end = min(start + batch_size, total)
yield (start, end, total, qs[start:end])
large_qs = RepoInfo.objects.filter(stargazers_count__gte=10, stargazers_count__lte=290000, fork=False)
for start, end, total, qs_chunk in batch_qs(large_qs):
documents = []
for repo_info in qs_chunk:
repo_info_doc = RepoInfoDoc()
repo_info_doc.meta.id = repo_info.id
repo_info_doc.owner_id = repo_info.owner_id
repo_info_doc.owner_username = repo_info.owner_username
repo_info_doc.owner_type = repo_info.owner_type
repo_info_doc.name = repo_info.name
repo_info_doc.full_name = repo_info.full_name
repo_info_doc.description = repo_info.description
repo_info_doc.language = repo_info.language
repo_info_doc.created_at = repo_info.created_at
repo_info_doc.updated_at = repo_info.updated_at
repo_info_doc.pushed_at = repo_info.pushed_at
repo_info_doc.homepage = repo_info.homepage
repo_info_doc.size = repo_info.size
repo_info_doc.stargazers_count = repo_info.stargazers_count
repo_info_doc.forks_count = repo_info.forks_count
repo_info_doc.subscribers_count = repo_info.subscribers_count
repo_info_doc.fork = repo_info.fork
repo_info_doc.has_issues = repo_info.has_issues
repo_info_doc.has_projects = repo_info.has_projects
repo_info_doc.has_downloads = repo_info.has_downloads
repo_info_doc.has_wiki = repo_info.has_wiki
repo_info_doc.has_pages = repo_info.has_pages
repo_info_doc.open_issues_count = repo_info.open_issues_count
repo_info_doc.topics = repo_info.topics
documents.append(repo_info_doc)
RepoInfoDoc.bulk_save(documents)
noplay/python-mysql-replication
https://github.com/noplay/python-mysql-replication
Find Similar Items
因為之後會在 Spark 裡作為推薦系統的候選物品集的來源之一,我們會把 Elasticsearch 的 More Like This API 封裝成一個 Spark 的 Transformer,所以以下的部分是用 Scala 寫的。
Initialize High-level REST Client
Elasticsearch 5.x 之後官方建議使用 High-level REST Client,用法跟以前 Java 的 TransportClient
稍微有點不同。
import org.apache.http.HttpHost
import org.elasticsearch.client.{RestClient, RestHighLevelClient}
val lowClient = RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")).build()
val highClient = new RestHighLevelClient(lowClient)
ref:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low-usage-initialization.html
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-getting-started-initialization.html
Perform the More Like This Query
我們會輸入一個 userDF
,是一個要產生候選物品集的用戶的 DataFrame,然後會先拿到每個用戶最近打星過的 repo 的列表,repo id 就是 Elasticsearch 的 document id,以此為條件用 More Like This query 找出相似的其他 repo。
val userRecommendedItemDF = userDF
.flatMap {
case (userId: Int) => {
val itemIds = selectUserStarredRepos(userId)
val lowClient = RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")).build()
val highClient = new RestHighLevelClient(lowClient)
val fields = Array("description", "full_name", "language", "topics")
val texts = Array("")
val items = itemIds.map((itemId: Int) => new Item("repo", "repo_info_doc", itemId.toString))
val queryBuilder = moreLikeThisQuery(fields, texts, items)
.minTermFreq(1)
.maxQueryTerms(20)
val searchSourceBuilder = new SearchSourceBuilder()
searchSourceBuilder.query(queryBuilder)
searchSourceBuilder.from(0)
searchSourceBuilder.size($(topK))
val searchRequest = new SearchRequest()
searchRequest.indices("repo")
searchRequest.types("repo_info_doc")
searchRequest.source(searchSourceBuilder)
val searchResponse = highClient.search(searchRequest)
val hits = searchResponse.getHits
val searchHits = hits.getHits
val userItemScoreTuples = searchHits.map((searchHit: SearchHit) => {
val itemId = searchHit.getId.toInt
val score = searchHit.getScore
(userId, itemId, score)
})
lowClient.close()
userItemScoreTuples
}
}
.toDF($(userCol), $(itemCol), $(scoreCol))
.withColumn($(sourceCol), lit(source))
userRecommendedItemDF.show()
// +-------+--------+---------+-------+
// |user_id|repo_id |score |source |
// +-------+--------+---------+-------+
// |652070 |26152923|44.360096|content|
// |652070 |28451314|38.752697|content|
// |652070 |16175350|35.676353|content|
// |652070 |10885469|30.280012|content|
// |652070 |24037308|28.488512|content|
// +-------+--------+---------+-------+
你可以在 GitHub 找到完整的程式碼
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/ContentRecommenderBuilder.scala
ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-mlt-query.html
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-specialized-queries.html