ほんじゃらねっと

ダイエット中プログラマのブログ

Google App Engine でエンティティの一括削除の方法を考えてみる

EvernoteのAPIを使ったWebアプリをappengine上に作ろうとしてる。
たまに既存のデータを全部消してフル同期する必要が発生しそうなので、
なんとかこれを最小のコストで終らせたい。


ついでに汎用的な一括削除の仕組みができたら再利用できて嬉しいな、と思って、
あれこれ考えて試作してみた。


調査したところ、

  • TaskQueueを使えば並列で処理が行なえて、しかも同時に実行した処理は1回分のCPU時間しか消費しないらしい
  • memcacheのincr、decrを使えばタスクが残っているかどうかが判定できそう
  • db.deleteやdb.putを使うと、ループでエンティティからputやdeleteを実行するより速いらしい
  • Key.from_pathでキーだけでdb.deleteした方が速いらしい

ということが分かった。


TaskQueueが要で、これにどうやって削除するエンティティを振り分けるかがポイントになりそう。
削除対象が重複したら意味ないしね。


今回は、KeyBankというユーザー毎の全キー名を管理するモデルを作り、
このKeyBankにTaskQueue毎の削除対象キーリストを振り分けさせる方法を試してみた。


KeyBankエンティティはユーザーIDをキーとしてユーザー毎にエンティティを作る。

KeyBankの子モデルとしてKeyChunkというモデルを作成し、
このKeyChunkにインデックス番号付きのキーを付けてそれぞれ上限つき(100個とか)で
キーのリストを持たせる。この上限は一回で処理する最小単位にするのだけど、まだ何個が
正解か分からない。
キーのリストはBlobPropertyに圧縮して保存する。

KeyChunkは直接触らず、キーリストの追加や取得はKeyBankを経由して行なう。


例としてノートのキーを管理するKeyBankモデルはこんな感じ。
全部のメソッドをテストしたわけではないし、トランザクションの使い方が合ってるかどうかもあやしいけど。イメージ。

from google.appengine.ext import db
import project1.serializer as serializer # Shibazukeにzlib処理を追加したシリアライザ
class NoteKeyBank(db.Model):
"""key_name is user_id"""
max_list_index = db.IntegerProperty(default=0)
MAX_CHUNK_SIZE = 100
def add_key_list(self, key_list):
user_key = self.key().name()
max_size = self.MAX_CHUNK_SIZE
chunk_num = int(len(key_list)/max_size) + 1
def txn(key_list):
chunk_list = []
for i in range(chunk_num):
sub_key_list = key_list[:max_size]
self.max_list_index += 1
new_chunk = NoteKeyChunk(
parent=self,
key_name="%s/%s" % (user_key, self.max_list_index),
size=len(sub_key_list)
)
new_chunk.set_keys(sub_key_list)
chunk_list.append(new_chunk)
key_list = key_list[max_size:]
db.put(chunk_list)
self.put()
db.run_in_transaction(txn, key_list)
def get_key_list(self, index):
user_key = self.key().name()
chunk = NoteKeyChunk.get_by_key_name("%s/%s" % (user_key, index), parent=self)
if chunk:
return chunk.get_keys()
else:
return None
def remove_key_list(self, index):
user_key = self.key().name()
chunk_key = db.Key.from_path('NoteKeyChunk', "%s/%s" % (user_key, index), parent=self.key())
db.delete(chunk_key)
def clear_list(self):
user_key = self.key().name()
def txn():
chunk_key_list = self.create_key_list(user_key)
db.delete(chunk_key_list)
self.max_list_index = 0
self.put()
db.run_in_transaction(txn)
def get_key_count(self):
user_key = self.key().name()
total = 0
key_list = self.create_key_name_list(user_key)
chunk_list =  NoteKeyChunk.get_by_key_name(key_list, parent=self.key())
for chunk in chunk_list:
if chunk:
total += chunk.size
return total
def get_chunk_count(self):
user_key = self.key().name()
count = 0
chunk_key_list = self.create_key_list(user_key)
for chunk_key in chunk_key_list:
if chunk_key:
count += 1
return count
def create_key_list(self, user_key):
key_name_list = self.create_key_name_list(user_key)
key_list = []
for key_name in key_name_list:
key_list.append(db.Key.from_path('NoteKeyChunk', key_name, parent=self.key()))
return key_list
def create_key_name_list(self, user_key):
key_name_list = []
for i in range(self.max_list_index):
key_name_list.append("%s/%s" % (user_key, i))
return key_name_list
class NoteKeyChunk(db.Model):
"""key_name is en_user_id / index"""
keys = db.BlobProperty()
size = db.IntegerProperty(default=0)
def set_keys(self, keys, compress=False):
self.keys = serializer.dumps(keys, compress)
def get_keys(self):
if self.keys:
return serializer.loads(self.keys)
else:
return None


使う流れとしては、まずノートの登録時にキーのリストをadd_key_listする

key_list = []
...
# APIからノートを取得してデータストアにたくさん追加
# key_listに保存したノートのキー名を入れる
...
key_bank = NoteKeyBank.get_by_key_name(user_key)
if key_bank is None:
key_bank = NoteKeyBank(key_name=user_key)
key_bank.add_key_list(key_list)


これで100個以下のキーリストを持つKeyChunkがインデックス付きで保存され、
KeyBankのmax_list_indexにKeyChunkの最大インデックスが保存される。


で、例えばタスク毎に200個ずつ削除したい場合は、下記のようにタスク生成処理を作る。
ここはもっとキレイに作りたい。


200個ずつ削除したいので、100キー入りKeyChunkのインデックスを2つずつまとめてから
ユーザーキーと一緒にタスクに渡してる。
memcacheには起動するタスクの数を入れておく。

...
def add_clean_notes_task(user_key, chunk_index_list):
clean_queue = Queue("cleaner") # 10/s
clean_queue.add(Task(
params={"key": user_key, "chunks": chunk_index_list},
url="/.../note/clean/"
))
...
key_bank = NoteKeyBank.get_by_key_name(user_key)
if key_bank is not None:
chunks_per_tq = int(200 / NoteKeyBank.MAX_CHUNK_SIZE)
chunk_index_list_list = []
chunk_index_list = []
for i in range(key_bank.max_list_index):
index = i + 1
if len(chunk_index_list) < chunks_per_tq:
chunk_index_list.append(index)
else:
chunk_index_list_list.append(chunk_index_list)
chunk_index_list = []
memcache.set("clean-notes-%s" % user_key, len(chunk_index_list_list))
for chunk_index_list in chunk_index_list_list:
add_clean_notes_task(user_key, chunk_index_list, True)


タスクで呼び出されるビューはこんな感じ

...
class CleanNotesRequestHandler(webapp.RequestHandler):
def post(self):
user_key = self.request.get("key")
chunk_index_list = self.request.get_all("chunks")
if user_key:
key_bank = NoteKeyBank.get_by_key_name(user_key)
for chunk_index in chunk_index_list:
key_name_list = key_bank.get_key_list(chunk_index)
clean_notes(key_name_list) # ノート削除処理
tq_num = memcache.decr("clean-notes-%s" % user_key)
if tq_num == 0:
key_bank.clear_list()
memcache.delete("clean-notes-%s" % user_key)


一応テスト環境ではうまく動作してるように見えるけど...
まだまだ改良は必要そう。後で使えそうな処理が色々入ってたのでひとまずメモしておく。