以前下記のURLでAppEngineでの全文検索についてあれこれ考える記事を書いたけど、実際に実装した例を載せてみる。
http://d.hatena.ne.jp/piro_suke/20100721/1279643738
検証してないけどあまり多くのデータを検索対象にするには向いてないと思う。
1ユーザーのデータとか、数人のプロジェクトのデータから検索するような事を想定。
また、検索文字列が大きかったり、一度にインデックス化する件数を増やす場合は
タスクキューを使う必要がありそう。
以下は、プロジェクト別でツイートできるサービスで、そのプロジェクト内のツイートからキーワード検索を行なう例。
models.py
from google.appengine.ext import db # pyshibazukeを呼び出すシリアライザ import projetter.serializer as serializer class CronSchedule(db.Model): """Cronサービスで処理を開始するエンティティのキーを管理するクラス key_name: 処理内容をあらわす一意の文字列 last: 前回最後に処理したキー名 """ last = db.StringProperty() class Project(db.Model): """プロジェクトモデル key_name: pid pid: プロジェクトID name: プロジェクト名 """ pid = db.StringProperty(required=True) name = db.StringProperty(required=True) class UserTweet(db.Model): """ツイートモデル key_name: twid twid: ツイートID pid: プロジェクトID uid: 発言ユーザーID mesg: 発言内容 idate: 発言日時 """ twid = db.StringProperty(required=True) pid = db.StringProperty() uid = db.StringProperty(required=True) mesg = db.TextProperty(required=True) idate = db.DateTimeProperty() class TweetSearchIndex(db.Model): """ツイートの検索用インデックス key_name: (pid)/(term)/(page from 1) pid: プロジェクトID term: ngram分割された文字列 twmap: termが含まれるツイートのtwidをキーとし、出現位置のリストを値とする辞書をシリアライズしたデータ is_fill: twmapのサイズが最大値に達しているかどうかを判定するフラグ """ pid = db.StringProperty() term = db.StringProperty() twmap = db.BlobProperty() is_full = db.BooleanProperty(default=False) def set_twmap(self, twmap_dict, compress=True): self.twmap = serializer.dumps(twmap_dict, compress) def get_twmap(self): return serializer.loads(self.twmap) if self.twmap else None
インデックス化処理(cronサービスでcreate_tweet_searc_index()を呼び出す)
def create_tweet_search_index(): # cronスケジュールから前回処理した最後のtwidを取得 schedule = _get_or_create_cron_schedule(CRON_KEY_NGRAM) last_twid = schedule.last or None # データストアから最後にインデックス化したものより新しいツイートを3件取得(別途定義) tweet_list = get_all_tweet_list_from_oldest(newest_id=last_twid, limit=3) if tweet_list: # 取得した各ツイートのインデックス化を実行 map(lambda tweet: _create_tweet_ngram(tweet["twid"]), tweet_list) # cronスケジュールに最後のツイートのtwidを保存 _update_cron_schedule(CRON_KEY_NGRAM, tweet_list[-1]["twid"]) def _create_tweet_ngram(twid): """インデックス化処理""" tweet = UserTweet.get_by_key_name(twid) if tweet: pid = tweet.pid # ツイートをngramに分割し、ngram文字列をキーとして出現位置のリストを値とする辞書を受けとる term_dict = _create_term_dict(tweet.mesg, NGRAM_LENGTH) # ツイートのngram文字列のリストを取得 term_list = term_dict.keys() # 各ngram文字列に対応する検索インデックスを取得 search_index_list = TweetSearchIndex.get_by_key_name(["%s/%s/1" % (pid, term) for term in term_list]) # 検索インデックスに今回のツイートのtwidと出現位置を追加 updated = [] for i, search_index in enumerate(search_index_list): search_index_page = 1 if search_index: term = search_index.term while search_index and search_index.is_full: search_index_page += 1 search_index = TweetSearchIndex.get_by_key_name("%s/%s/%s" % (pid, term, search_index_page)) if search_index: twmap = search_index.get_twmap() twmap[twid] = term_dict[search_index.term] if search_index is None: search_index_key = "%s/%s/%s" % (pid, term_list[i], search_index_page) twmap = {twid: term_dict[term_list[i]]} search_index = TweetSearchIndex(key_name=search_index_key) search_index.pid = pid search_index.term = term_list[i] search_index.is_full = False search_index.set_twmap(twmap) # update is_full if len(search_index.twmap) >= NGRAM_MAX_MAP_LENGTH: search_index.is_full = True updated.append(search_index) db.put(updated) def _create_term_dict(s, ngram_length): """ 文字列を指定のngram長に分割し、 ngram文字列をキーとして出現位置をリストとして持つ辞書を生成して返す """ d = {} s = s.replace("\r", "") s = s.replace("\n", "") s = s.lower() for i,c in enumerate(s): term = s[i:i+ngram_length] if len(term) == ngram_length: if d.has_key(term): d[term].append(i) else: d[term] = [i] return d def _update_cron_schedule(cron_id, last): """cronスケジュールを更新""" schedule = CronSchedule.get_or_insert(key_name=cron_id, last="") schedule.last = last schedule.put()
検索処理(検索リクエストが来たらsearch_project_tweet_list()を呼び出す)
TWEETS_PER_PAGE = 20 # 検索ページ毎の表示件数 NGRAM_LENGTH = 2 # ngram長。今回は2-gram。 NGRAM_MAX_MAP_LENGTH = 900000 # 適切な値か不明 CRON_KEY_NGRAM = "create_ngramindex" # cronスケジュールキー def search_project_tweet_list(pid, search_words, oldest_id=None, limit=TWEETS_PER_PAGE): """ pid: プロジェクトのキー search_words: スペース区切りの検索文字列 oldest_id: 前ページの最後のツイートキー limit: 返すツイートの数 """ # 検索文字列をスペースで分けて処理 search_word_list = search_words.split(" ") twid_set = set([]) for i, search_word in enumerate(search_word_list): # 検索文字列を含むtwidのリストを取得 twid_list = _search_project_tweet_by_keyword(pid, search_word) if i: # twid_setとtwid_listの両方に含まれてるものだけを残す twid_set.intersection_update(twid_list) else: twid_set.update(twid_list) twid_list = list(twid_set) tweet_list = [] if twid_list: # twidリストをソート(twidが辞書順になっていることを想定) twid_list = sorted(twid_list, reverse=True) # TODO cache this list if oldest_id: twid_list = filter(lambda x: x < oldest_id, twid_list) twid_list = twid_list[:limit] tweet_list = UserTweet.get_by_key_name(twid_list) return tweet_list def _search_project_tweet_by_keyword(pid, word): """wordを含むツイートのtwidリストを取得""" # 検索文字列をngramに分割 term_list = _create_term_list(word, NGRAM_LENGTH) # 各ngramに対応する検索インデックスを取得 search_index_list = TweetSearchIndex.get_by_key_name(["%s/%s/1" % (pid, term) for term in term_list]) search_index_list = filter(lambda x: x is not None, search_index_list) if len(term_list) > search_index_list: return [] # 検索文字列と同じ順番でngramが出現するtwidのリストを検索インデックスから抽出 matching_twmap_dict = {} for search_index in search_index_list: twmap = search_index.get_twmap() search_index_page = 1 term = search_index.term while search_index and search_index.is_full: search_index_page += 1 search_index = TweetSearchIndex.get_by_key_name("%s/%s/%s" % (pid, term, search_index_page)) if search_index: twmap.update(search_index.get_twmap()) if not matching_twmap_dict: # first turn matching_twmap_dict = twmap else: remaining_twid_list = matching_twmap_dict.keys() for twid in remaining_twid_list: if not twmap.has_key(twid): del matching_twmap_dict[twid] else: point_list = filter(lambda x: x-1 in matching_twmap_dict[twid], twmap[twid]) if point_list: matching_twmap_dict[twid] = point_list else: del matching_twmap_dict[twid] return matching_twmap_dict.keys() def _create_term_list(s, ngram_length): """文字列を指定のngram長で分割""" term_list = [] s = s.replace("\r", "") s = s.replace("\n", "") s = s.lower() for i,c in enumerate(s): term = s[i:i+ngram_length] if len(term) == ngram_length: term_list.append(term) return term_list