ほんじゃらねっと

ダイエット中プログラマのブログ

Google App Engine で全文検索を実装してみる


以前下記のURLでAppEngineでの全文検索についてあれこれ考える記事を書いたけど、実際に実装した例を載せてみる。
http://d.hatena.ne.jp/piro_suke/20100721/1279643738


検証してないけどあまり多くのデータを検索対象にするには向いてないと思う。
1ユーザーのデータとか、数人のプロジェクトのデータから検索するような事を想定。
また、検索文字列が大きかったり、一度にインデックス化する件数を増やす場合は
タスクキューを使う必要がありそう。

以下は、プロジェクト別でツイートできるサービスで、そのプロジェクト内のツイートからキーワード検索を行なう例。


models.py

from google.appengine.ext import db
# pyshibazukeを呼び出すシリアライザ
import projetter.serializer as serializer
class CronSchedule(db.Model):
"""Cronサービスで処理を開始するエンティティのキーを管理するクラス
    key_name: 処理内容をあらわす一意の文字列
    last: 前回最後に処理したキー名
    """
last = db.StringProperty()
class Project(db.Model):
"""プロジェクトモデル
    key_name: pid
    pid: プロジェクトID
    name: プロジェクト名
    """
pid = db.StringProperty(required=True)
name = db.StringProperty(required=True)
class UserTweet(db.Model):
"""ツイートモデル
    key_name: twid
    twid: ツイートID
    pid: プロジェクトID
    uid: 発言ユーザーID
    mesg: 発言内容
    idate: 発言日時
    """
twid = db.StringProperty(required=True)
pid = db.StringProperty()
uid = db.StringProperty(required=True)
mesg = db.TextProperty(required=True)
idate = db.DateTimeProperty()
class TweetSearchIndex(db.Model):
"""ツイートの検索用インデックス
    key_name: (pid)/(term)/(page from 1)
    pid: プロジェクトID
    term: ngram分割された文字列
    twmap: termが含まれるツイートのtwidをキーとし、出現位置のリストを値とする辞書をシリアライズしたデータ
    is_fill: twmapのサイズが最大値に達しているかどうかを判定するフラグ
    """
pid = db.StringProperty()
term = db.StringProperty()
twmap = db.BlobProperty()
is_full = db.BooleanProperty(default=False)
def set_twmap(self, twmap_dict, compress=True):
self.twmap = serializer.dumps(twmap_dict, compress)
def get_twmap(self):
return serializer.loads(self.twmap) if self.twmap else None


インデックス化処理(cronサービスでcreate_tweet_searc_index()を呼び出す)

def create_tweet_search_index():
# cronスケジュールから前回処理した最後のtwidを取得
schedule = _get_or_create_cron_schedule(CRON_KEY_NGRAM)
last_twid = schedule.last or None
# データストアから最後にインデックス化したものより新しいツイートを3件取得(別途定義)
tweet_list = get_all_tweet_list_from_oldest(newest_id=last_twid, limit=3)
if tweet_list:
# 取得した各ツイートのインデックス化を実行
map(lambda tweet: _create_tweet_ngram(tweet["twid"]), tweet_list)
# cronスケジュールに最後のツイートのtwidを保存
_update_cron_schedule(CRON_KEY_NGRAM, tweet_list[-1]["twid"])
def _create_tweet_ngram(twid):
"""インデックス化処理"""
tweet = UserTweet.get_by_key_name(twid)
if tweet:
pid = tweet.pid
# ツイートをngramに分割し、ngram文字列をキーとして出現位置のリストを値とする辞書を受けとる
term_dict = _create_term_dict(tweet.mesg, NGRAM_LENGTH)
# ツイートのngram文字列のリストを取得
term_list = term_dict.keys()
# 各ngram文字列に対応する検索インデックスを取得
search_index_list = TweetSearchIndex.get_by_key_name(["%s/%s/1" % (pid, term) for term in term_list])
# 検索インデックスに今回のツイートのtwidと出現位置を追加
updated = []
for i, search_index in enumerate(search_index_list):
search_index_page = 1
if search_index:
term = search_index.term
while search_index and search_index.is_full:
search_index_page += 1
search_index = TweetSearchIndex.get_by_key_name("%s/%s/%s" % (pid, term, search_index_page))
if search_index:
twmap = search_index.get_twmap()
twmap[twid] = term_dict[search_index.term]
if search_index is None:
search_index_key = "%s/%s/%s" % (pid, term_list[i], search_index_page)
twmap = {twid: term_dict[term_list[i]]}
search_index = TweetSearchIndex(key_name=search_index_key)
search_index.pid = pid
search_index.term = term_list[i]
search_index.is_full = False
search_index.set_twmap(twmap)
# update is_full
if len(search_index.twmap) >= NGRAM_MAX_MAP_LENGTH:
search_index.is_full = True
updated.append(search_index)
db.put(updated)
def _create_term_dict(s, ngram_length):
"""
    文字列を指定のngram長に分割し、
    ngram文字列をキーとして出現位置をリストとして持つ辞書を生成して返す
    """
d = {}
s = s.replace("\r", "")
s = s.replace("\n", "")
s = s.lower()
for i,c in enumerate(s):
term = s[i:i+ngram_length]
if len(term) == ngram_length:
if d.has_key(term):
d[term].append(i)
else:
d[term] = [i]
return d
def _update_cron_schedule(cron_id, last):
"""cronスケジュールを更新"""
schedule = CronSchedule.get_or_insert(key_name=cron_id, last="")
schedule.last = last
schedule.put()


検索処理(検索リクエストが来たらsearch_project_tweet_list()を呼び出す)

TWEETS_PER_PAGE = 20 # 検索ページ毎の表示件数
NGRAM_LENGTH = 2 # ngram長。今回は2-gram。
NGRAM_MAX_MAP_LENGTH = 900000 # 適切な値か不明
CRON_KEY_NGRAM = "create_ngramindex" # cronスケジュールキー
def search_project_tweet_list(pid, search_words, oldest_id=None, limit=TWEETS_PER_PAGE):
"""
    pid: プロジェクトのキー
    search_words: スペース区切りの検索文字列
    oldest_id: 前ページの最後のツイートキー
    limit: 返すツイートの数
    """
# 検索文字列をスペースで分けて処理
search_word_list = search_words.split(" ")
twid_set = set([])
for i, search_word in enumerate(search_word_list):
# 検索文字列を含むtwidのリストを取得
twid_list = _search_project_tweet_by_keyword(pid, search_word)
if i:
# twid_setとtwid_listの両方に含まれてるものだけを残す
twid_set.intersection_update(twid_list)
else:
twid_set.update(twid_list)
twid_list = list(twid_set)
tweet_list = []
if twid_list:
# twidリストをソート(twidが辞書順になっていることを想定)
twid_list = sorted(twid_list, reverse=True) # TODO cache this list
if oldest_id:
twid_list = filter(lambda x: x < oldest_id, twid_list)
twid_list = twid_list[:limit]
tweet_list = UserTweet.get_by_key_name(twid_list)
return tweet_list
def _search_project_tweet_by_keyword(pid, word):
"""wordを含むツイートのtwidリストを取得"""
# 検索文字列をngramに分割
term_list = _create_term_list(word, NGRAM_LENGTH)
# 各ngramに対応する検索インデックスを取得
search_index_list = TweetSearchIndex.get_by_key_name(["%s/%s/1" % (pid, term) for term in term_list])
search_index_list = filter(lambda x: x is not None, search_index_list)
if len(term_list) > search_index_list:
return []
# 検索文字列と同じ順番でngramが出現するtwidのリストを検索インデックスから抽出
matching_twmap_dict = {}
for search_index in search_index_list:
twmap = search_index.get_twmap()
search_index_page = 1
term = search_index.term
while search_index and search_index.is_full:
search_index_page += 1
search_index = TweetSearchIndex.get_by_key_name("%s/%s/%s" % (pid, term, search_index_page))
if search_index:
twmap.update(search_index.get_twmap())
if not matching_twmap_dict: # first turn
matching_twmap_dict = twmap
else:
remaining_twid_list = matching_twmap_dict.keys()
for twid in remaining_twid_list:
if not twmap.has_key(twid):
del matching_twmap_dict[twid]
else:
point_list = filter(lambda x: x-1 in matching_twmap_dict[twid], twmap[twid])
if point_list:
matching_twmap_dict[twid] = point_list
else:
del matching_twmap_dict[twid]
return matching_twmap_dict.keys()
def _create_term_list(s, ngram_length):
"""文字列を指定のngram長で分割"""
term_list = []
s = s.replace("\r", "")
s = s.replace("\n", "")
s = s.lower()
for i,c in enumerate(s):
term = s[i:i+ngram_length]
if len(term) == ngram_length:
term_list.append(term)
return term_list