Build a recommender system with Spark: Content-based and Elasticsearch

Build a recommender system with Spark: Content-based and Elasticsearch

在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。

在本篇文章中,我們將以 Candidate Generation 階段常用的方法之一:Content-based recommendation 基於內容的推薦為例,利用 Elasticsearch 的 More Like This query 建立一個 GitHub repositories 的推薦系統,以用戶最近打星過的 repo 作為輸入數據,比對出相似的其他 repo 作為候選物品集。

題外話,我原本是打算用 Spark 把 repo 的文本資料轉成 Word2Vec 向量,然後事先計算好各個 repo 之間的相似度(所謂的 Similarity Join),但是要計算這麼多 repo 之間的相似度實在太耗時間和機器了,就算用了 DIMSUM 和 Locality Sensitive Hashing (LSH) 的 Approximate Nearest Neighbor Search 的效果也不是很好。後來一想,尋找相似或相關物品這件事不就是搜尋引擎在做的嗎,所以直接把 repo 的各種資料丟進 Elasticsearch,用 document id 當作搜尋條件,一個 More Like This query 就解決了,爽快。畢竟不需要所有的事情都在 Spark 裡解決嘛。

完整的程式碼可以在 https://github.com/vinta/albedo 找到。

系列文章:

Setup Elasticsearch

為了讓事情簡單一點,我們直接用官方包裝好的 Docker image。另外要注意的是,Elasticsearch 5.x/6.x 跟之前的版本比起來有不小的改動,例如 X-Pack、high-level REST client 和以後每個 index 只能有一個 mapping type 等等,建議大家有空可以翻一下文件。

# in elasticsearch.yml
bootstrap.memory_lock: true
cluster.name: albedo
discovery.type: single-node
http.host: 0.0.0.0
node.name: ${HOSTNAME}
xpack.security.enabled: false
# in docker-compose.yml
version: "3"
services:
  django:
    build: .
    hostname: django
    working_dir: /app
    env_file: .docker-assets/django.env
    command: .docker-assets/django_start.sh
    ports:
      - 8000:8000
    volumes:
      - ".:/app"
      - "../albedo-vendors/bin:/usr/local/bin"
      - "../albedo-vendors/dist-packages:/usr/local/lib/python3.5/dist-packages"
    links:
      - mysql
      - elasticsearch
  mysql:
    image: vinta/mysql:5.7
    hostname: mysql
    env_file: .docker-assets/mysql.env
    command: mysqld --character-set-server=utf8 --collation-server=utf8_unicode_ci
    ports:
      - 3306:3306
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:5.6.2
    ports:
      - 9200:9200
      - 9300:9300
    volumes:
      - "./.docker-assets/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml"
    environment:
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
$ docker-compose up

然後就可以在 http://127.0.0.1:9200/ 存取你的 Elasticsearch cluster 了。

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/5.6/docker.html
https://www.elastic.co/guide/en/elasticsearch/reference/5.6/security-settings.html

Define the Mapping (Data Schema)

這裡用 elasticsearch-dsl-py 定義了一個 index 和 mapping type。

from elasticsearch.helpers import bulk
from elasticsearch_dsl import analyzer
from elasticsearch_dsl import Date, Integer, Keyword, Text, Boolean
from elasticsearch_dsl import Index, DocType
from elasticsearch_dsl.connections import connections

client = connections.create_connection(hosts=['elasticsearch'])

repo_index = Index('repo')
repo_index.settings(
    number_of_shards=1,
    number_of_replicas=0
)

text_analyzer = analyzer(
    'text_analyzer',
    char_filter=["html_strip"],
    tokenizer="standard",
    filter=["asciifolding", "lowercase", "snowball", "stop"]
)
repo_index.analyzer(text_analyzer)

@repo_index.doc_type
class RepoInfoDoc(DocType):
    owner_id = Keyword()
    owner_username = Keyword()
    owner_type = Keyword()
    name = Text(text_analyzer, fields={'raw': Keyword()})
    full_name = Text(text_analyzer, fields={'raw': Keyword()})
    description = Text(text_analyzer)
    language = Keyword()
    created_at = Date()
    updated_at = Date()
    pushed_at = Date()
    homepage = Keyword()
    size = Integer()
    stargazers_count = Integer()
    forks_count = Integer()
    subscribers_count = Integer()
    fork = Boolean()
    has_issues = Boolean()
    has_projects = Boolean()
    has_downloads = Boolean()
    has_wiki = Boolean()
    has_pages = Boolean()
    open_issues_count = Integer()
    topics = Keyword(multi=True)

    class Meta:
        index = repo_index._name

    @classmethod
    def bulk_save(cls, documents):
        dicts = (d.to_dict(include_meta=True) for d in documents)
        return bulk(client, dicts)

    def save(self, **kwargs):
        return super(RepoInfoDoc, self).save(**kwargs)

RepoInfoDoc.init()

Elasticsearch: More than a Search Engine
https://vinta.ws/code/elasticsearch-more-than-a-search-engine.html

ref:
https://github.com/elastic/elasticsearch-dsl-py

Import Data into Elasticsearch

你可以透過很多種手段把存在 MySQL 裡的資料倒進 Elasticsearch,例如 cronjob、Celery 或 MySQL binglog replication,不過因為我們主要的 data models 是用 Django ORM 寫的,這裡就簡單地寫個 Django command 把資料倒進去就好。

from django.core.management.base import BaseCommand

from app.mappings import RepoInfoDoc
from app.models import RepoInfo

class Command(BaseCommand):
    def handle(self, *args, **options):
        def batch_qs(qs, batch_size=500):
            total = qs.count()
            for start in range(0, total, batch_size):
                end = min(start + batch_size, total)
                yield (start, end, total, qs[start:end])

        large_qs = RepoInfo.objects.filter(stargazers_count__gte=10, stargazers_count__lte=290000, fork=False)
        for start, end, total, qs_chunk in batch_qs(large_qs):
            documents = []
            for repo_info in qs_chunk:
                repo_info_doc = RepoInfoDoc()
                repo_info_doc.meta.id = repo_info.id
                repo_info_doc.owner_id = repo_info.owner_id
                repo_info_doc.owner_username = repo_info.owner_username
                repo_info_doc.owner_type = repo_info.owner_type
                repo_info_doc.name = repo_info.name
                repo_info_doc.full_name = repo_info.full_name
                repo_info_doc.description = repo_info.description
                repo_info_doc.language = repo_info.language
                repo_info_doc.created_at = repo_info.created_at
                repo_info_doc.updated_at = repo_info.updated_at
                repo_info_doc.pushed_at = repo_info.pushed_at
                repo_info_doc.homepage = repo_info.homepage
                repo_info_doc.size = repo_info.size
                repo_info_doc.stargazers_count = repo_info.stargazers_count
                repo_info_doc.forks_count = repo_info.forks_count
                repo_info_doc.subscribers_count = repo_info.subscribers_count
                repo_info_doc.fork = repo_info.fork
                repo_info_doc.has_issues = repo_info.has_issues
                repo_info_doc.has_projects = repo_info.has_projects
                repo_info_doc.has_downloads = repo_info.has_downloads
                repo_info_doc.has_wiki = repo_info.has_wiki
                repo_info_doc.has_pages = repo_info.has_pages
                repo_info_doc.open_issues_count = repo_info.open_issues_count
                repo_info_doc.topics = repo_info.topics

                documents.append(repo_info_doc)

            RepoInfoDoc.bulk_save(documents)

noplay/python-mysql-replication
https://github.com/noplay/python-mysql-replication

Find Similar Items

因為之後會在 Spark 裡作為推薦系統的候選物品集的來源之一,我們會把 Elasticsearch 的 More Like This API 封裝成一個 Spark 的 Transformer,所以以下的部分是用 Scala 寫的。

Initialize High-level REST Client

Elasticsearch 5.x 之後官方建議使用 High-level REST Client,用法跟以前 Java 的 TransportClient 稍微有點不同。

import org.apache.http.HttpHost
import org.elasticsearch.client.{RestClient, RestHighLevelClient}

val lowClient = RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")).build()
val highClient = new RestHighLevelClient(lowClient)

ref:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low-usage-initialization.html
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-getting-started-initialization.html

Perform the More Like This Query

我們會輸入一個 userDF,是一個要產生候選物品集的用戶的 DataFrame,然後會先拿到每個用戶最近打星過的 repo 的列表,repo id 就是 Elasticsearch 的 document id,以此為條件用 More Like This query 找出相似的其他 repo。

val userRecommendedItemDF = userDF
  .flatMap {
    case (userId: Int) => {
      val itemIds = selectUserStarredRepos(userId)

      val lowClient = RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")).build()
      val highClient = new RestHighLevelClient(lowClient)

      val fields = Array("description", "full_name", "language", "topics")
      val texts = Array("")
      val items = itemIds.map((itemId: Int) => new Item("repo", "repo_info_doc", itemId.toString))
      val queryBuilder = moreLikeThisQuery(fields, texts, items)
        .minTermFreq(1)
        .maxQueryTerms(20)

      val searchSourceBuilder = new SearchSourceBuilder()
      searchSourceBuilder.query(queryBuilder)
      searchSourceBuilder.from(0)
      searchSourceBuilder.size($(topK))

      val searchRequest = new SearchRequest()
      searchRequest.indices("repo")
      searchRequest.types("repo_info_doc")
      searchRequest.source(searchSourceBuilder)

      val searchResponse = highClient.search(searchRequest)
      val hits = searchResponse.getHits
      val searchHits = hits.getHits

      val userItemScoreTuples = searchHits.map((searchHit: SearchHit) => {
        val itemId = searchHit.getId.toInt
        val score = searchHit.getScore
        (userId, itemId, score)
      })

      lowClient.close()

      userItemScoreTuples
    }
  }
  .toDF($(userCol), $(itemCol), $(scoreCol))
  .withColumn($(sourceCol), lit(source))

userRecommendedItemDF.show()
// +-------+--------+---------+-------+
// |user_id|repo_id |score    |source |
// +-------+--------+---------+-------+
// |652070 |26152923|44.360096|content|
// |652070 |28451314|38.752697|content|
// |652070 |16175350|35.676353|content|
// |652070 |10885469|30.280012|content|
// |652070 |24037308|28.488512|content|
// +-------+--------+---------+-------+

你可以在 GitHub 找到完整的程式碼
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/ContentRecommenderBuilder.scala

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-mlt-query.html
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-specialized-queries.html

elasticsearch-dsl-py: The Official Elasticsearch ORM in Python

elasticsearch-dsl-py: The Official Elasticsearch ORM in Python

Query DSL 是 Elasticsearch 的查詢用 Domain-specific Language (DSL),實際上就是一堆 JSON。elasticsearch-dsl 是官方發佈的一套用來操作 Query DSL 的 Python package,可以當成是 Elasticsearch 的 ORM。

希望之後可以直接支援用 SQL 來查詢,不然 Query DSL 真的有夠難寫。

ref:
https://github.com/elastic/elasticsearch-dsl-py

Installation

$ pip install elasticsearch-dsl>=5.0.0,<6.0.0

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/index.html

Schema

in app/mappings.py

from elasticsearch_dsl import DocType, String, Boolean
from elasticsearch_dsl.connections import connections
connections.create_connection(hosts=['127.0.0.1', ])

class AlbumDoc(DocType):
    upc = String(index='not_analyzed')
    title = String(analyzer='ik', fields={'raw': String(index='not_analyzed')})
    artist = String(analyzer='ik')
    is_ready = Boolean()

    class Meta:
        index = 'dps'
        doc_type = 'album'

    @classmethod
    def sync(cls, album):
        album_doc = AlbumDoc(meta={'id': album.id})
        album_doc.upc = album.get_upcs(output_str=False)
        album_doc.title = album.name
        album_doc.artist = album.artist.name
        album_doc.is_ready = album.is_ready
        album_doc.save()

    def save(self, *args, **kwargs):
        return super(AlbumDoc, self).save(*args, **kwargs)

    def get_model_obj(self):
        from svapps.dps.models import Album
        return Album.objects.get(id=self.meta.id)

# to create mappings
AlbumDoc.init()

一定要執行一次 YourDocType.init(),這樣 Elasticsearch 才會根據你的 DocType 產生對應的 mapping。否則 Elasticsearch 就會在你第一次倒資料進去的時候根據你的資料的 data type 建立對應的 mapping,所以 analyzer 之類的設定就會是預設的 standard,你可以透過 _mapping API 來檢查。

需要全文搜尋的欄位要設為 analyzed(string 欄位默認都是 analyzed),不需要全文搜尋的欄位,也就是要求精確的欄位,例如:usernameemailzip code,就可以設成 not_analyzed,但是你就不能對 analyzed 的欄位使用 term 了,除非你對該欄位額外再建立一個 raw 欄位。

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/persistence.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-term-query.html#CO59-2

Store Data

album_doc = AlbumDoc(meta={'id': 42})
album_doc.upc = ['887375000619', '887375502069']
album_doc.title = 'abc'
album_doc.artist = 'xyz'
album_doc.is_ready = True
album_doc.save()

# 可以如常地 query,不用管它是不是 list
search = AlbumDoc.search().filter('term', upc='887375000619')
response = search.execute()

因為 Elasticsearch 是 schemaless,所以即使你定義了 String 欄位,還是可以存一個 list 進去。

Search Data

  • must:必須符合所有條件
  • should:符合其中一個條件即可
search = TrackDoc.search() \
    .filter('term', is_ready=True) \
    .query('match', title=u'沒有的啊')

search = TrackDoc.search() \
    .filter('term', is_ready=True) \
    .query(
        Q('match', title='沒有的啊') & \
        Q('match', artist='那我懂你意思了') & \
        Q('match', album='沒有的, 啊!?')
    )

q = Q(
    'bool',
    must=[
        Q('match', title={'query': track_name, 'fuzziness': 'AUTO'}),
    ],
    should=[
        Q('match', album={'query': album_name, 'minimum_should_match': '60%'}),
        Q('match', artist={'query': artist_name, 'minimum_should_match': '80%'}),
    ],
    minimum_should_match=1
)
search = TrackDoc.search().filter('term', is_ready=True).query(q)

q = Q(
    'bool',
    should=[
        Q('term', isrc=q),
        Q('term', upc=q),
        Q('match', **{'title.raw': {'query': q}}),
        Q('multi_match', query=q, fields=['title', 'artist', 'album']),
    ],
)
search = Search(index='dps', doc_type=['track', 'album']).query(q)
search = search[:20]

# print the raw Query DSL
import uniout
from pprint import pprint
pprint(search.to_dict())

response = search.execute()

print(response.hits.total)
print(response[0].title)
print(response[0].artist)
print(response[0].album)
print(response[0].is_ready)

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/search_dsl.html

Elasticsearch: More than a Search Engine

Elasticsearch is a schemaless, document-oriented search engine, has a bunch of powerful quering APIs. It's also a great NoSQL database.

ref:
https://www.elastic.co/products/elasticsearch

Glossary

以下的定義以 Elasticsearch 5.6 為準,可能跟舊版的定義不同。在新版的 Elasticsearch 中,每個 index 只能有一個 mapping type,之前的版本則可以有多個。

  • cluster:一個 cluster 包含一個或多個 nodes,會自動選出一個 master node
  • node:一台跑著 Elasticsearch 的機器就是一個 node
  • index:類似關聯式資料庫裡的 table
  • mapping:類似關聯式資料庫裡的 table schema
  • field:類似關聯式資料庫裡的 column
  • document:類似關聯式資料庫裡的 row
  • text:任意的非結構化文字,text 會被 analyze 變成 term,然後才能被搜尋
  • term:實際上存在 Elasticsearch 裡的東西
  • analysis: 把 text 變成 term 的過程,例如 normalize、tokenize 和 stopword remove

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/glossary.html

Mapping

Mapping 可以顯式地指定,但是如果沒有指定,Elasticsearch 就會在第一次有資料進去的時候,自動根據資料建立對應的 mapping,所以一些欄位的屬性(例如 analyzer)可能不會符合你的預期,所以最好還是手動指定 mapping。

常用的 field data types:

  • text:表示 string 類型,用於 full-text search
  • keyword:表示 string 類型,用於 exact value 的 filter、sort 或 aggregate(就是舊版的 not_analyzed

在 Elasticsearch 中,同一個欄位可以被 index 成不同的 data types,例如 location 欄位,可以透過 fields 屬性,同時 index 成 textkeyword,用來全文搜索和 exact value 過濾。也可以分別指定不同的 analyzer。

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html

Analysis

一個 analyzer 由三個部分組成:

  • Character filters
  • Tokenizers
  • Token filters

你可以自己組合出你的 analyzer,以 elasticsearch-dsl-py 為例:

from elasticsearch_dsl import DocType, Date, Integer, Keyword, Text, Boolean
from elasticsearch_dsl import analyzer, tokenizer

text_analyzer = analyzer('text_analyzer',
    char_filter=["html_strip"],
    tokenizer="standard",
    filter=["asciifolding", "lowercase", "snowball", "stop"]
)

cjk_analyzer = analyzer('text_analyzer',
    char_filter=["html_strip"],
    tokenizer=tokenizer('trigram', 'nGram', min_gram=2, max_gram=3),
    filter=["asciifolding", "lowercase", "snowball", "stop"]
)

ref:
http://elasticsearch-dsl.readthedocs.io/en/latest/persistence.html

Testing Analyzer

測試某段文字在某個 analyzer 下的效果:

POST http://127.0.0.1:9200/_analyze
{
  "tokenizer": "standard",
  "filter": ["lowercase", "asciifolding"],
  "text": "Is this déja vu?"
}

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-analyzers.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-tokenizers.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-tokenfilters.html

Chinese Words Segmentation

ik 之類的分詞 plugin 的效果都不是很好,內建的 cjk 加上 NGram 可能會是比較好的選擇(可以用 multi-field index)。另外一個作法是,把資料餵進去 Elasticsearch 之前就先分好詞,可以用 Jieba,分詞完的文本以空格分隔,然後用 Elasticsearch 的 whitespace tokenizer。

中文搜尋經驗分享
https://blog.liang2.tw/2015Talk-Chinese-Search/

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-lang-analyzer.html#cjk-analyzer
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-ngram-tokenizer.html
https://www.elastic.co/guide/en/elasticsearch/plugins/current/analysis-smartcn.html << 不推薦
https://github.com/medcl/elasticsearch-analysis-ik << 堪用

RESTful APIs

Show useful information for humans
http://127.0.0.1:9200/_cat
https://www.elastic.co/guide/en/elasticsearch/reference/current/cat.html

List all indices and aliases
http://127.0.0.1:9200/_aliases

List mappings under am index
http://127.0.0.1:9200/repo/_mapping

List documents under an index
http://127.0.0.1:9200/repo/_search

Query DSL

query 就是你要搜索的主體
filter 則是這個搜索的前置條件
https://www.elastic.co/guide/en/elasticsearch/guide/master/search-in-depth.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-queries.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-filters.html

要做 exact value 的 query
請用 term
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-term-query.html

要做 full text 的 query
請用 match
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query.html

要一次 query 多個欄位
請用 multi_match
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-multi-match-query.html

要用 AND (must), OR (should), NOT (must_not) 的條件搜索
請用 bool
https://www.elastic.co/guide/en/elasticsearch/guide/master/bool-query.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-bool-query.html

要結合 filter 和 query
請用 filtered
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-filtered-query.html

More Like This query
除了可以輸入文字之外,還可以直接指定 document id 找出相似的結果
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-mlt-query.html

對特定欄位加權
https://www.elastic.co/guide/en/elasticsearch/guide/current/query-time-boosting.html
https://www.elastic.co/guide/en/elasticsearch/guide/current/_boosting_query_clauses.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-boosting-query.html

Multi-index, Multi-type

除了可以搜尋單一 type,也可以跨 index、跨 type

  • /_search: Search all types in all indices
  • /gb/_search: Search all types in the gb index
  • /gb,us/_search: Search all types in the gb and us indices
  • /g*,u*/_search: Search all types in any indices beginning with g or beginning with u
  • /gb/user/_search: Search type user in the gb index
  • /gb,us/user,tweet/_search: Search types user and tweet in the gb and us indices
  • /_all/user,tweet/_search: Search types user and tweet in all indices

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html#search-multi-index-type