Build a recommender system with Spark: Content-based and Elasticsearch

Build a recommender system with Spark: Content-based and Elasticsearch

在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。

在本篇文章中,我們以 Candidate Generation 階段常用的方法之一:Content-based recommendation 基於內容的推薦為例,利用 Elasticsearch 的 More Like This query 建立一個 GitHub repositories 的推薦系統,以用戶最近打星過的 repo 作為輸入數據,比對出相似的其他 repo 作為候選物品集。

題外話,我原本是打算用 Spark 把 repo 的文本資料轉成 Word2Vec 向量,然後事先計算好各個 repo 之間的相似度(所謂的 Similarity Join),但是要計算這麼多 repo 之間的相似度實在太耗時間和機器了,就算用了 DIMSUM 和 Locality Sensitive Hashing (LSH) 的 Approximate Nearest Neighbor Search 的效果也不是很好。後來一想,尋找相似或相關物品這件事不就是搜尋引擎在做的嗎,所以直接把 repo 的各種資料丟進 Elasticsearch,用 document id 當作搜尋條件,一個 More Like This query 就解決了,爽快。畢竟不需要所有的事情都在 Spark 裡解決嘛。

完整的程式碼可以在 https://github.com/vinta/albedo 找到。

Setup Elasticsearch

為了讓事情簡單一點,我們直接用官方包裝好的 Docker image。另外要注意的是,Elasticsearch 5.x/6.x 跟之前的版本比起來有不小的改動,例如 X-Pack、high-level REST client 和以後每個 index 只能有一個 mapping type 等等,建議大家有空可以翻一下文件。

# in elasticsearch.yml
bootstrap.memory_lock: true
cluster.name: albedo
discovery.type: single-node
http.host: 0.0.0.0
node.name: ${HOSTNAME}
xpack.security.enabled: false
# in docker-compose.yml
version: "3"
services:
  django:
    build: .
    hostname: django
    working_dir: /app
    env_file: .docker-assets/django.env
    command: .docker-assets/django_start.sh
    ports:
      - 8000:8000
    volumes:
      - ".:/app"
      - "../albedo-vendors/bin:/usr/local/bin"
      - "../albedo-vendors/dist-packages:/usr/local/lib/python3.5/dist-packages"
    links:
      - mysql
      - elasticsearch
  mysql:
    image: vinta/mysql:5.7
    hostname: mysql
    env_file: .docker-assets/mysql.env
    command: mysqld --character-set-server=utf8 --collation-server=utf8_unicode_ci
    ports:
      - 3306:3306
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:5.6.2
    ports:
      - 9200:9200
      - 9300:9300
    volumes:
      - "./.docker-assets/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml"
    environment:
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
$ docker-compose up

然後就可以在 http://127.0.0.1:9200/ 存取你的 Elasticsearch cluster 了。

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/5.6/docker.html
https://www.elastic.co/guide/en/elasticsearch/reference/5.6/security-settings.html

Define the Mapping (Data Schema)

這裡用 elasticsearch-dsl-py 定義了一個 index 和 mapping type。

from elasticsearch.helpers import bulk
from elasticsearch_dsl import analyzer
from elasticsearch_dsl import Date, Integer, Keyword, Text, Boolean
from elasticsearch_dsl import Index, DocType
from elasticsearch_dsl.connections import connections

client = connections.create_connection(hosts=['elasticsearch'])

repo_index = Index('repo')
repo_index.settings(
    number_of_shards=1,
    number_of_replicas=0
)

text_analyzer = analyzer(
    'text_analyzer',
    char_filter=["html_strip"],
    tokenizer="standard",
    filter=["asciifolding", "lowercase", "snowball", "stop"]
)
repo_index.analyzer(text_analyzer)

@repo_index.doc_type
class RepoInfoDoc(DocType):
    owner_id = Keyword()
    owner_username = Keyword()
    owner_type = Keyword()
    name = Text(text_analyzer, fields={'raw': Keyword()})
    full_name = Text(text_analyzer, fields={'raw': Keyword()})
    description = Text(text_analyzer)
    language = Keyword()
    created_at = Date()
    updated_at = Date()
    pushed_at = Date()
    homepage = Keyword()
    size = Integer()
    stargazers_count = Integer()
    forks_count = Integer()
    subscribers_count = Integer()
    fork = Boolean()
    has_issues = Boolean()
    has_projects = Boolean()
    has_downloads = Boolean()
    has_wiki = Boolean()
    has_pages = Boolean()
    open_issues_count = Integer()
    topics = Keyword(multi=True)

    class Meta:
        index = repo_index._name

    @classmethod
    def bulk_save(cls, documents):
        dicts = (d.to_dict(include_meta=True) for d in documents)
        return bulk(client, dicts)

    def save(self, **kwargs):
        return super(RepoInfoDoc, self).save(**kwargs)

RepoInfoDoc.init()

Elasticsearch: More than a Search Engine
https://vinta.ws/code/elasticsearch-more-than-a-search-engine.html

ref:
https://github.com/elastic/elasticsearch-dsl-py

Import Data into Elasticsearch

你可以透過很多種手段把存在 MySQL 裡的資料倒進 Elasticsearch,例如 cronjob、Celery 或 MySQL binglog,不過因為我們主要的 data models 是用 Django ORM 寫的,這裡就簡單地寫個 Django command 把資料倒進去就好。

from django.core.management.base import BaseCommand

from app.mappings import RepoInfoDoc
from app.models import RepoInfo

class Command(BaseCommand):
    def handle(self, *args, **options):
        def batch_qs(qs, batch_size=500):
            total = qs.count()
            for start in range(0, total, batch_size):
                end = min(start + batch_size, total)
                yield (start, end, total, qs[start:end])

        large_qs = RepoInfo.objects.filter(stargazers_count__gte=10, stargazers_count__lte=290000, fork=False)
        for start, end, total, qs_chunk in batch_qs(large_qs):
            documents = []
            for repo_info in qs_chunk:
                repo_info_doc = RepoInfoDoc()
                repo_info_doc.meta.id = repo_info.id
                repo_info_doc.owner_id = repo_info.owner_id
                repo_info_doc.owner_username = repo_info.owner_username
                repo_info_doc.owner_type = repo_info.owner_type
                repo_info_doc.name = repo_info.name
                repo_info_doc.full_name = repo_info.full_name
                repo_info_doc.description = repo_info.description
                repo_info_doc.language = repo_info.language
                repo_info_doc.created_at = repo_info.created_at
                repo_info_doc.updated_at = repo_info.updated_at
                repo_info_doc.pushed_at = repo_info.pushed_at
                repo_info_doc.homepage = repo_info.homepage
                repo_info_doc.size = repo_info.size
                repo_info_doc.stargazers_count = repo_info.stargazers_count
                repo_info_doc.forks_count = repo_info.forks_count
                repo_info_doc.subscribers_count = repo_info.subscribers_count
                repo_info_doc.fork = repo_info.fork
                repo_info_doc.has_issues = repo_info.has_issues
                repo_info_doc.has_projects = repo_info.has_projects
                repo_info_doc.has_downloads = repo_info.has_downloads
                repo_info_doc.has_wiki = repo_info.has_wiki
                repo_info_doc.has_pages = repo_info.has_pages
                repo_info_doc.open_issues_count = repo_info.open_issues_count
                repo_info_doc.topics = repo_info.topics

                documents.append(repo_info_doc)

            RepoInfoDoc.bulk_save(documents)

Find Similar Items

因為之後會在 Spark 裡作為推薦系統的候選物品集的來源之一,我們會把 Elasticsearch 的 More Like This API 封裝成一個 Spark 的 Transformer,所以以下的部分是用 Scala 寫的。

Initialize High-level REST Client

Elasticsearch 5.x 之後官方建議使用 High-level REST Client,用法跟以前 Java 的 TransportClient 稍微有點不同。

import org.apache.http.HttpHost
import org.elasticsearch.client.{RestClient, RestHighLevelClient}

val lowClient = RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")).build()
val highClient = new RestHighLevelClient(lowClient)

ref:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low-usage-initialization.html
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-getting-started-initialization.html

Perform the More Like This Query

我們會輸入一個 userDF,是一個要產生候選物品集的用戶的 DataFrame,然後會先拿到每個用戶最近打星過的 repo 的列表,repo id 就是 Elasticsearch 的 document id,以此為條件用 More Like This query 找出相似的其他 repo。

val userRecommendedItemDF = userDF
  .flatMap {
    case (userId: Int) => {
      val itemIds = selectUserStarredRepos(userId)

      val lowClient = RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")).build()
      val highClient = new RestHighLevelClient(lowClient)

      val fields = Array("description", "full_name", "language", "topics")
      val texts = Array("")
      val items = itemIds.map((itemId: Int) => new Item("repo", "repo_info_doc", itemId.toString))
      val queryBuilder = moreLikeThisQuery(fields, texts, items)
        .minTermFreq(1)
        .maxQueryTerms(20)

      val searchSourceBuilder = new SearchSourceBuilder()
      searchSourceBuilder.query(queryBuilder)
      searchSourceBuilder.from(0)
      searchSourceBuilder.size($(topK))

      val searchRequest = new SearchRequest()
      searchRequest.indices("repo")
      searchRequest.types("repo_info_doc")
      searchRequest.source(searchSourceBuilder)

      val searchResponse = highClient.search(searchRequest)
      val hits = searchResponse.getHits
      val searchHits = hits.getHits

      val userItemScoreTuples = searchHits.map((searchHit: SearchHit) => {
        val itemId = searchHit.getId.toInt
        val score = searchHit.getScore
        (userId, itemId, score)
      })

      lowClient.close()

      userItemScoreTuples
    }
  }
  .toDF($(userCol), $(itemCol), $(scoreCol))
  .withColumn($(sourceCol), lit(source))

userRecommendedItemDF.show()
// +-------+--------+---------+-------+
// |user_id|repo_id |score    |source |
// +-------+--------+---------+-------+
// |652070 |26152923|44.360096|content|
// |652070 |28451314|38.752697|content|
// |652070 |16175350|35.676353|content|
// |652070 |10885469|30.280012|content|
// |652070 |24037308|28.488512|content|
// +-------+--------+---------+-------+

完整的程式碼可以在 https://github.com/vinta/albedo 找到。

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-mlt-query.html
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-specialized-queries.html

Change index mappings with zero downtime using elasticsearch-py

Basically you can't change mappings (so-called "schema") in Elasticsearch. You may add fields free but changing field definitions (field types or analyzers) of mappings is impossible. One way or another, you need to create a new index.

Steps:

  • Create an alias my_index which points to the old index my_index_v1
  • Use my_index instead of my_index_v1 in your application
  • Create a new index my_index_v2 with new mappings
  • Transfer documents from old index to new index - a.k.a. reindex
  • Associate the alias my_index with index my_index_v2
  • Delete the old index my_index_v1
from datetime import datetime

from elasticsearch import Elasticsearch
from elasticsearch.helpers import reindex

es_client = Elasticsearch(hosts=settings.ES_HOSTS)

# make sure that this alias doesn't conflict with any existing index name
alias = 'packer'

# CAUTION: if you have an index already, you should create an alias for it first
# es_client.indices.put_alias(index='your_current_index', name=alias)

old_indexes = list(es_client.indices.get_alias(alias).keys())
try:
    old_index = old_indexes[0]
except IndexError:
    old_index = None
else:
    if len(old_indexes) > 1:
        raise RuntimeError('Alias `{0}` points to {1} indexes that may cause error when writing data to `{0}`'.format(alias, len(old_indexes)))

new_index = '{}_{}'.format(alias, datetime.now().strftime('%Y%m%d%H%M%S%f'))

available_types = [TrackDoc, AlbumDoc]
for my_doc_type in available_types:
    # create a new index with new mappings
    my_doc_type.init(index=new_index)

if old_index:
    # transfer documents from old index to new index
    reindex(es_client, source_index=old_index, target_index=new_index)

    es_client.indices.update_aliases({
        'actions': [
            {'remove': {'index': old_index, 'alias': alias}},
            {'add': {'index': new_index, 'alias': alias}},
        ],
    })
else:
    es_client.indices.update_aliases({
        'actions': [
            {'add': {'index': new_index, 'alias': alias}},
        ],
    })

ref:
https://www.elastic.co/blog/changing-mapping-with-zero-downtime
https://blog.codecentric.de/en/2014/09/elasticsearch-zero-downtime-reindexing-problems-solutions/
http://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.reindex

An alias can point to multiple indexes, in that case, reading (searching) from the alias performs perfectly, writing (indexing) to the alias raises an exception: Alias [my_index] has more than one indices associated with it [[my_index_v1, my_index_v2]], can't execute a single index op.

It's not recommended to set the same alias for multiple indexes unless explicitly using a specific index for writing data.

ref:
https://www.elastic.co/guide/en/elasticsearch/guide/current/multiple-indices.html

Create aliases

# list all indexes and their aliases
$ curl 'http://127.0.0.1:9200/_aliases'

# create an alias
$ curl -XPOST 'http://127.0.0.1:9200/_aliases' -d '
{
    "actions" : [
        { "add" : { "index" : "dps", "alias" : "packer" } }
    ]
}
'

# delete all indexes and aliases
$ curl -XDELETE 'http://127.0.0.1:9200/*/'

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/1.5/indices-aliases.html

Update index settings

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/1.5/indices-put-mapping.html
https://www.elastic.co/guide/en/elasticsearch/reference/1.5/indices-update-settings.html
https://gist.github.com/nicolashery/6317643

Use elasticsearch-dsl with Python

Query DSL 是 Elasticsearch 的查詢用 Domain-specific Language (DSL),實際上就是一堆 JSON。elasticsearch-dsl 是官方發佈的一套用來操作 Query DSL 的 Python package,可以當成是 Elasticsearch 的 ORM。

希望之後可以直接支援用 SQL 來查詢,不然 Query DSL 真的有夠難寫。

ref:
https://github.com/elastic/elasticsearch-dsl-py

Installation

$ pip install elasticsearch-dsl>=5.0.0,<6.0.0

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/index.html

Schema

in app/mappings.py

from elasticsearch_dsl import DocType, String, Boolean
from elasticsearch_dsl.connections import connections
connections.create_connection(hosts=['127.0.0.1', ])

class AlbumDoc(DocType):
    upc = String(index='not_analyzed')
    title = String(analyzer='ik', fields={'raw': String(index='not_analyzed')})
    artist = String(analyzer='ik')
    is_ready = Boolean()

    class Meta:
        index = 'dps'
        doc_type = 'album'

    @classmethod
    def sync(cls, album):
        album_doc = AlbumDoc(meta={'id': album.id})
        album_doc.upc = album.get_upcs(output_str=False)
        album_doc.title = album.name
        album_doc.artist = album.artist.name
        album_doc.is_ready = album.is_ready
        album_doc.save()

    def save(self, *args, **kwargs):
        return super(AlbumDoc, self).save(*args, **kwargs)

    def get_model_obj(self):
        from svapps.dps.models import Album
        return Album.objects.get(id=self.meta.id)

# to create mappings
AlbumDoc.init()

一定要執行一次 YourDocType.init(),這樣 Elasticsearch 才會根據你的 DocType 產生對應的 mapping。否則 Elasticsearch 就會在你第一次倒資料進去的時候根據你的資料的 data type 建立對應的 mapping,所以 analyzer 之類的設定就會是預設的 standard,你可以透過 _mapping API 來檢查。

需要全文搜尋的欄位要設為 analyzed(string 欄位默認都是 analyzed),不需要全文搜尋的欄位,也就是要求精確的欄位,例如:usernameemailzip code,就可以設成 not_analyzed,但是你就不能對 analyzed 的欄位使用 term 了,除非你對該欄位額外再建立一個 raw 欄位。

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/persistence.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-term-query.html#CO59-2

Store Data

album_doc = AlbumDoc(meta={'id': 42})
album_doc.upc = ['887375000619', '887375502069']
album_doc.title = 'abc'
album_doc.artist = 'xyz'
album_doc.is_ready = True
album_doc.save()

# 可以如常地 query,不用管它是不是 list
search = AlbumDoc.search().filter('term', upc='887375000619')
response = search.execute()

因為 Elasticsearch 是 schemaless,所以即使你定義了 String 欄位,還是可以存一個 list 進去。

Search Data

  • must:必須符合所有條件
  • should:符合其中一個條件即可
search = TrackDoc.search() \
    .filter('term', is_ready=True) \
    .query('match', title=u'沒有的啊')

search = TrackDoc.search() \
    .filter('term', is_ready=True) \
    .query(
        Q('match', title='沒有的啊') & \
        Q('match', artist='那我懂你意思了') & \
        Q('match', album='沒有的, 啊!?')
    )

q = Q(
    'bool',
    must=[
        Q('match', title={'query': track_name, 'fuzziness': 'AUTO'}),
    ],
    should=[
        Q('match', album={'query': album_name, 'minimum_should_match': '60%'}),
        Q('match', artist={'query': artist_name, 'minimum_should_match': '80%'}),
    ],
    minimum_should_match=1
)
search = TrackDoc.search().filter('term', is_ready=True).query(q)

q = Q(
    'bool',
    should=[
        Q('term', isrc=q),
        Q('term', upc=q),
        Q('match', **{'title.raw': {'query': q}}),
        Q('multi_match', query=q, fields=['title', 'artist', 'album']),
    ],
)
search = Search(index='dps', doc_type=['track', 'album']).query(q)
search = search[:20]

# print the raw Query DSL
import uniout
from pprint import pprint
pprint(search.to_dict())

response = search.execute()

print(response.hits.total)
print(response[0].title)
print(response[0].artist)
print(response[0].album)
print(response[0].is_ready)

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/search_dsl.html