ほんじゃらねっと

ダイエット中プログラマのブログ

Pocketからエクスポートしたブックマークをデータベースに取込むスクリプト

自分のブックマークを色々分析できるように、PocketのOptionsページから
エクスポートしたブックマークのリストをデータベースに取込む。

PythonでMySQLdbとBeautifulSoup4を使用。

テーブル

CREATE TABLE bookmarks (
id integer NOT NULL AUTO_INCREMENT,
title varchar(255),
url text,
created datetime,
PRIMARY KEY (id)
);

スクリプト(pocket_export2db.py)

!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import MySQLdb
from datetime import *
from bs4 import BeautifulSoup
DB_INFO = {
"host": "DBホスト名",
"db": "DB名",
"user": "DBユーザー名",
"passwd": "DBパスワード",
"charset": "DB文字コード"
}
def main():
argvs = sys.argv
if len(argvs) != 2:
print "Usage: python %s export_file_path" % argvs[0]
quit()
file_path = argvs[1]
soup = BeautifulSoup(open(file_path), from_encoding="utf-8")
connect = MySQLdb.connect(**DB_INFO)
connect.cursorclass = MySQLdb.cursors.DictCursor
cursor = connect.cursor()
for link in soup.find_all("a"):
href = link.get("href")
time_added = link.get("time_added")
tags = link.get("tags")
title = link.get_text()
print title
created = datetime.fromtimestamp(int(time_added))
res = cursor.execute("INSERT INTO bookmarks(title, url, created) VALUES(%s, %s, %s)", (title, href, created))
connect.commit()
cursor.close()
connect.close()
if __name__ == '__main__':
main()

使い方

python pocket_export2db.py エクスポートしたファイルのパス

Apacheのログからアクセス数上位20件のIPアドレスを表示する

こちらを参考にしました
http://www.freia.jp/taka/blog/356

import re
f = open("error_log")
pattern = re.compile(r"\[client (?P<ip>[0-9\.]+)\]")
ips = {}
for line in f:
m = pattern.search(line)
if m:
ip = m.group("ip")
ips[ip] = ips.get(ip, 0) + 1
d = [(v,k) for k,v in ips.items()]
d.sort()
d.reverse()
total = 0
for count, ip in d[:20]:
total += count
print count, ip
print "total: %s" % total

Foursquare API のOAuth認証用クラス

FoursquareAPIを使って場所系のサービスでも作ってやろう、
と思い立って作っている。


APIドキュメントのRate Limitingを読んだところ、
認証済みの場合は認証ユーザー毎、認証していない場合はIPアドレス毎に
1メソッドにつき1時間200アクセスまでと制限されているらしい。
TwitterAPIよりもGoogle App Engineで利用するのは大変じゃなさそう。


ドキュメントはこちら。
http://groups.google.com/group/foursquare-api/web/api-documentation


認証処理はOAuthなのでTwitterの場合とほとんど同じ。
僕はTwitterでAppEngine-OAuth-Libraryを使っていたのだけど、
このライブラリのOAuthClientクラスを継承してFoursquare用のクライアント
クラスを作成して使っている。各URLを変更しただけのもの。


AppEngine-OAuth-Library
http://github.com/mikeknapp/AppEngine-OAuth-Library


作ったクライアントクラスはこんな感じ

foursquare_oauth.py

import logging
from django.utils import simplejson as json
from oauth import OAuthClient
def get_oauth_client(service, key, secret, callback_url):
return FoursquareClient(key, secret, callback_url)
class FoursquareClient(OAuthClient):
def __init__(self, consumer_key, consumer_secret, callback_url):
"""Constructor."""
OAuthClient.__init__(self,
"foursquare",
consumer_key,
consumer_secret,
"http://foursquare.com/oauth/request_token",
"http://foursquare.com/oauth/access_token",
callback_url)
def get_authorization_url(self):
"""Get Authorization URL."""
token = self._get_auth_token()
return "http://foursquare.com/oauth/authorize?oauth_token=%s" % token
def _lookup_user_info(self, access_token, access_secret):
"""Lookup User Info.
        Lookup the user on Foursquare.
        """
response = self.make_request(
"http://api.foursquare.com/v1/user.json",
token=access_token, secret=access_secret, protected=True)
data = json.loads(response.content)
user_info = data["user"]
return user_info

OAuthの具体的な処理については、また機会があったらまとめたい。

re.finditerで正規表現マッチしたパターン毎にグループ化

分かりにくいタイトルだけど。


1つのファイル内に同じパターンにマッチする箇所が複数ある場合に、
個々の箇所でデータを名前つきグループ化して抽出したい場合に使える。


例えば、HTMLファイルの中からテーブルの各行から個々の列の値を取り出す時とか。


パターンの例

<tr>
<td>(?P<name>.*)</td>
<td>(?P<owner>.*)</td>
<td>(?P<zip>.*)</td>
<td>(?P<address>.*)</td>
<td>(?P<tel>.*)</td>
<td>(?P<fax>.*)</td>
</tr>


コードの例

SOURCE_FILE = "./test_source.html"
PATTERN_FILE = "./test_pattern.html"
OUTPUT_FILE = "./test_output.html"
COLUMNS = [
"name",
"owner",
"zip",
"address",
"tel",
"fax",
]
import re
def main():
pattern_data = open(PATTERN_FILE).read()
source_data = open(SOURCE_FILE).read()
p = re.compile(pattern_data)
f = open(OUTPUT_FILE, "w")
for m in p.finditer(source_data):
if m:
d = m.groupdict()
f.write(",".join([d[name] for name in COLUMNS if d.has_key(name)]))
f.write("\n")
f.close()
print "%s saved." % OUTPUT_FILE
if __name__ == "__main__":
main()

関数の呼び出しログを取る

デコレータ便利。この例では呼び出しと引数だけをログしてるけど、
処理にかかった時間の計測とか、色々できそう。


log.py

import logging
def log_api_call(func):
def decorator(*args, **kwargs):
logging.info("API: %s called. params: %s %s" % (func.func_name, str(args)[:200], str(kwargs)[:200]))
return func(*args,**kwargs)
return decorator


api.py

from project1.api.log import log_api_call
@log_api_call
def api1(arg1):
...

lxmlでHTMLページタイトル取得

一応動くサンプル。あんまりテストしてないので、取得できないパターンはあるかも。


覚えておきたいのはxpathの設定方法で、
どうやら

//head/title/text()

と書くとたまにうまくタイトルが取得できない場合があるようで、
そんな場合でも

//title/text()

という風に直接タグを呼び出したら取得できた。

import re
import urllib2
from lxml import etree
def get_title_from_url(url, lxml_tree=None):
if lxml_tree is None:
lxml_tree = get_parsetree_from_url(url)
if lxml_tree is None:
return None
title = None
title_block = lxml_tree.xpath("//title/text()")
if title_block:
title = title_block[0]
title = re.compile("\r|\n", re.S|re.M).sub("", title)
charset = guess_content_encoding(title)
if charset is not None:
title = title.decode(charset).encode("utf-8")
if title is None:
title = "Page Title not found"
return title
def get_parsetree_from_url(url):
if url is None:
return None
try:
html = urllib2.urlopen(url)
except urllib2.HTTPError, e:
return None
html_data = html.read()
charset = html.headers.getparam('charset')
if charset is None:
charset = guess_content_encoding(html_data)
lxml_tree = etree.fromstring(html_data, parser=etree.HTMLParser(encoding=charset))
return lxml_tree
def guess_content_encoding(html_content):
encoding = None
for i in ["utf-8", "shift_jis", "euc-jp"]:
try:
unicode(html_read_data, i)
encoding = i
break
except Exception, e:
pass
return encoding

正規表現で引用符を検索する

TumblrRSS解析を試みていて、そこでQuote登録した文字列を検索して変換しようとしたところ、苦労したのでメモ。

RSSのソースを見たところ、引用した部分は二重引用符で囲まれているようだったので、普通にダブルクオートで検索してみたけどうまくヒットしなかった。

r'(?P<quote>")'


RSSのソースをよく見てみると、普通のダブルクオートとは異なる、特殊な引用符を使ってるみたい。
ord()で確認したところ、\u8220と\u8221というコードの文字を使用していた。他のダブルクオートと区別するためかな?


そこで、下記の方法を試してみたけど、どちらもダメだった。

r'(?P<quote>〈直接文字をペースト〉)'
r'(?P<quote>\u8220)'


色々調べたら、下記のページで、修飾子としてrではなく、urを使えば良い、と書いてあった。
何やらunicodeをうまくやってくれるらしい。
http://stackoverflow.com/questions/393843/python-and-regular-expression-with-unicode


で、これで試す。

ur'(?P<quote>〈直接文字をペースト〉)'
ur'(?P<quote>\u8220)'


試したら、直接文字をペーストした方はうまくいったけど、下の16進数表記はうまくいかなかった。


さらに調べると、下記のようなページに辿りついた。どうやら、一般的なUnicodeではない?らしい。
http://www.fileformat.info/info/unicode/char/201c/index.htm
http://www.fileformat.info/info/unicode/char/201d/index.htm


親切にPythonで表記する方法も書いてあったので、それを試したところ、うまくいった。

ur'(?P<quote>\u201c)' # 開きクオート
ur'(?P<quote>\u201d)' # 閉じクオート


久しぶりにはまったけど、やっぱり文字コードまわりは大変。

python-bitlyとpython-twitterを使ってTwitterにポストする

Honjalaに新規登録されたブックマークをTwitterに自動投稿するスクリプトを書いた。
ブログやブックマークをTwitterに自動投稿する機会は今後もありそうなのでメモ。

準備

Twitterとbitlyにアカウントをあらかじめ作成しておく。


PythonからTwitter APIを利用するためのライブラリと、
元記事へのリンクを短縮するためにbitlyのPython用ライブラリを使用する。
(Twitter APIにURLを短縮してくれる機能があるとかどこかに書いてあったけど、確認できなかった)
bitlyのライブラリはbitly.pyのみなので、それをsite-packages直下にでも置けばOK。


python-twitter
http://code.google.com/p/python-twitter/


python-bitly
http://code.google.com/p/python-bitly/


Twitterのユーザー名とパスワード、bitlyのユーザー名とAPIキーが必要。
bitlyに登録するとアカウント管理画面でAPIキーが表示されるので、それをメモっておく。

コード

...
import twitter
import bitly
...
twitter_api = twitter.Api(TWITTER_USERNAME, TWITTER_PASSWORD)
bitly_api = bitly.Api(login=BITLY_API_LOGIN, apikey=BITLY_API_KEY)
short_url = bitly_api.shorten("http://d.hatena.ne.jp/piro_suke/")
title = "僕のブログです" # unicode化が必要かも。
tweet = "Bookmark: %s %s" % (title, short_url)
twitter_api.PostUpdate(tweet)

指定ディレクトリの内容を再帰的に削除

また使いそうなので記録。指定ディレクトリ内の.svnやらCVSやらを一括削除するためのスクリプト
バグがあるかもしれないので、削除処理をコメントにして削除対象を確認できるようにしておく。

recursive_delete.py

# -*- coding: utf-8 -*-
import os
import sys
import re
if len(sys.argv) < 3:
print "Usage: python recursive_delete.py ROOT_DIR TARGET"
exit()
root_dir = sys.argv[1]
target_name = sys.argv[2]
if not os.path.exists(root_dir):
print "directory (%s) does not exist." % root_dir
exit()
file_path_list = []
dir_path_list = []
pattern = re.compile(target_name, re.I)
for root, dirs, files in os.walk(root_dir):
for f in files:
target_path = os.path.join(root, f)
if pattern.search(target_path):
file_path_list.append(target_path)
for d in dirs:
target_path = os.path.join(root, d)
if pattern.search(target_path):
dir_path_list.append(target_path)
for target_path in file_path_list:
print "removing file %s" % target_path
#os.remove(target_path)
dir_path_list.reverse()
for target_path in dir_path_list:
print "removing dir %s" % target_path
#os.rmdir(target_path)

lxmlで日本語のWebページのタイトルを取得する

日本語が化けて大変苦労したのでメモ。
結論として、XML(またはHTML)を解析する前にunicode関数に通しておく、
ということで良いのかな?
相変わらず文字コード関連はよく分からない。

from urllib import urlopen
from lxml import etree
html = urlopen("http://b.hatena.ne.jp")
charset = html.headers.getparam('charset')
html_data = unicode(html.read(),charset)
et = etree.fromstring(html_data, parser=etree.HTMLParser())
title_element = et.xpath("./head/title")[0]
title = title_element.text.encode("utf-8")
print title # UTF-8に変換されたタイトルが出力される


どうやら上記の使い方はencoding宣言付きのXMLではうまくいかないみたい。
その場合は、unicode変換をしなければうまくいった。
うむむ。

置換処理の置換後の文字列の代わりに関数を指定する

今後も使えそうなのでメモ。

import re
from django.core.exceptions import ObjectDoesNotExist
from sample.web.models import WebPage
PATTERN = r'<a href="%(dummy_url)s" >%(real_url)s</a>'
HTML_TEMPLATE = re.compile(r"((http|https)://[-_.!~*\'()a-zA-Z0-9;\/?:\@&=+\$,%#]+)", re.I)
def substitute_url(m):
url = m.group(0)
webpage = None
try:
webpage = WebPage.objects.get(url=url)
except ObjectDoesNotExist:
webpage = WebPage()
webpage.url = url
webpage.save()
return PATTERN % {"real_url": url, "dummy_url": "/bookmark/%s/" % (webpage.id,),}
print HTML_TEMPLATE.sub(substitute_url, "http://b.hatena.ne.jp/ が面白かったので紹介します。")

Pythonでコマンドプロンプトを拡張

WindowsコマンドプロンプトPythonで拡張するwxPythonアプリケーションを作った。
Linuxシェルでも動くかもしれないが、試していない。


pythonスクリプトと、シェルコマンドが使える。コマンドを入力すると、
最初に指定ディレクトリ内のpythonスクリプトを検索し、存在しない場合は
シェルコマンドとして実行する。


まだcdコマンドしか作っていないので、正しく動作するかどうか。
いつかちゃんと作って、タブ機能も実装しよう。


多少修正した。
下記のページを参考に文字コード取得メソッドを作らせていただいた。
http://www.freia.jp/taka/blog/571


pyrompt.py

# coding: utf-8
import os
import re
import sys
import subprocess
import wx
class Pyrompt(wx.Frame):
LEFT_PROMPT_END = ">"
KEY_CTRL = -96
def __init__(self, parent, id, title):
"""初期化処理"""
# 画面初期化
wx.Frame.__init__(self, parent, -1, title, size = (500, 400))
box = wx.BoxSizer(wx.VERTICAL)
# メニューバー作成
menu_bar = wx.MenuBar()
file_menu = wx.Menu()
quit_menuitem = wx.MenuItem(file_menu, 105, '&Quit', 'Quit Pyrompt')
file_menu.AppendItem(quit_menuitem)
menu_bar.Append(file_menu, "&File")
self.SetMenuBar(menu_bar)
# タブ作成
notebook = wx.Notebook(self, -1, style=wx.RIGHT)
tab1 = wx.Panel(notebook, -1)
tab1.SetFocus()
notebook.AddPage(tab1, 'Prompt1')
box.Add(notebook, 1, wx.EXPAND)
# プロンプト作成
tab_box = wx.BoxSizer(wx.VERTICAL)
self.view = wx.TextCtrl(tab1, -1, '', size=(-1, -1), style=wx.TE_MULTILINE | wx.TE_PROCESS_ENTER | wx.TE_CHARWRAP)
self.view.SetEditable(False)
tab_box.Add(self.view, 1, wx.EXPAND)
tab1.SetSizer(tab_box)
# イベント設定
self.Bind(wx.EVT_CLOSE, self.QuitApplication)
self.Bind(wx.EVT_MENU, self.QuitApplication, id=105)
self.view.Bind(wx.EVT_CHAR, self.OnTextEnter)
self.Centre()
self.Show(True)
self.view.SetValue(self.get_left_prompt())
self.current_line_number = 0
self.start_position = len(self.LEFT_PROMPT_END)
# コマンドロード
self.load_commands()
# ホームディレクトリに移動
os.chdir(wx.GetHomeDir())
def OnTextEnter(self, event):
"""テキスト入力イベント"""
keycode = event.GetKeyCode()
view = event.GetEventObject()
mousePoint = view.GetInsertionPoint()
(x,y) = view.PositionToXY(mousePoint)
if keycode in (wx.WXK_RETURN, wx.WXK_NUMPAD_ENTER):
end_position = view.GetInsertionPoint()
command = view.GetRange(self.start_position, end_position)
view.AppendText("\n")
self.execute_command(command)
view.AppendText(self.get_left_prompt())
(x,y) = view.PositionToXY(view.GetInsertionPoint())
self.current_line_number = y
self.start_position = view.GetInsertionPoint()
else:
if keycode in range(32,127): # visible characters
view.WriteText(chr(keycode))
elif keycode == wx.WXK_BACK: # backspace
if self.current_line_number < y or len(self.get_left_prompt()) < x:
view.Remove(mousePoint-1,mousePoint)
elif keycode == wx.WXK_DELETE: # delete
view.Remove(mousePoint,mousePoint+1)
elif keycode == wx.WXK_LEFT: # left key
if self.current_line_number < y or len(self.get_left_prompt()) < x:
view.SetInsertionPoint(view.XYToPosition(x-1,y))
elif keycode == wx.WXK_RIGHT: # right key
line_length = view.GetLineLength(y)
if x < line_length:
view.SetInsertionPoint(view.XYToPosition(x+1,y))
elif keycode == wx.WXK_UP: # up key
pass
elif keycode == wx.WXK_DOWN: # down key
pass
elif keycode == wx.WXK_HOME: # home key
if y == self.current_line_number:
view.SetInsertionPoint(view.XYToPosition(len(self.get_left_prompt()), y))
else:
view.SetInsertionPoint(view.XYToPosition(0, y))
elif keycode == wx.WXK_END: # end key
view.SetInsertionPoint(view.XYToPosition(view.GetLineLength(y), y))
elif keycode in range(1,27): # Ctrl+a-z
pass
def execute_command(self, statement):
"""pythonスクリプトコマンドを実行"""
(command, arg_list) = self.split_statement(statement)
if command:
if self.command_list.has_key(command):
(res, message) =self.command_list[command].execute(self, arg_list)
self.view.AppendText(message)
self.view.AppendText("\n")
else:
res = self.execute_oscommand(statement)
self.view.AppendText("RETURN CODE: %d" % (res,))
self.view.AppendText("\n")
def execute_oscommand(self, command):
"""OSコマンドを実行"""
p = subprocess.Popen(command, shell=True, cwd=os.getcwd(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
(stdouterr, stdin) = (p.stdout, p.stdin)
while True:
line = stdouterr.readline()
if not line:
break
line_data = line.rstrip()
self.view.AppendText(unicode(line_data, self.get_charset(line_data)).encode("utf-8"))
self.view.AppendText("\n")
res = p.wait()
return res
def QuitApplication(self, event):
"""確認してアプリケーションを終了"""
dlg = wx.MessageDialog(self, "Are you sure you want to Exit?", '', wx.YES_NO | wx.YES_DEFAULT | wx.CANCEL | wx.ICON_QUESTION)
val = dlg.ShowModal()
if val == wx.ID_YES:
self.exit_app()
elif wx.ID_CANCEL:
dlg.Destroy()
else:
dlg.Destroy()
def exit_app(self):
"""アプリケーションを終了"""
wx.Exit()
def split_statement(self, statement):
"""入力をコマンドと引数リストに分割"""
statement = statement.strip()
command = None
arg_list = []
if statement:
m = re.search(r"^([a-zA-Z0-9_]+) (.+)", statement)
if m:
command = m.group(1)
arg_list = self.split_args(m.group(2))
else:
command = statement
return (command, arg_list)
def split_args(self, args):
"""引数をリストに分割。クオートを考慮"""
arg_list = []
if args:
arg_buffer = ""
in_quote = False
in_squote = False
in_dquote = False
for c in args:
if c != '"' and c != "'" and c != " ":
arg_buffer = arg_buffer + c
elif c == " " and in_quote:
arg_buffer = arg_buffer + c
elif c == " ":
arg_list.append(arg_buffer)
arg_buffer = ""
if c == '"':
in_dquote = not in_dquote
elif c == "'":
in_squote = not in_squote
if in_squote or in_dquote:
in_quote = True
else:
in_quote = False
if len(arg_buffer) > 0:
arg_list.append(arg_buffer)
return arg_list
def load_commands(self):
"""pythonコマンドをロード"""
try:
self.command_list = {}
file_name_list = os.listdir(os.path.join(os.path.dirname(os.path.abspath(__file__)), "pybin"))
for name in file_name_list:
m = re.search(r"([a-zA-Z0-9]+).py", name)
if m and name != "__init__.py":
cmd_name = m.group(1)
self.command_list[cmd_name] = self.import_module("pybin.%s" % cmd_name)
except Exception, e:
self.message_dlg(e)
def import_module(self, name):
"""モジュールを動的にインポート"""
mod = __import__(name)
components = name.split('.')
for comp in components[1:]:
mod = getattr(mod, comp)
return mod
def get_left_prompt(self):
"""プロンプトを表示"""
return self.LEFT_PROMPT_END
def message_dlg(self, message):
"""デバッグ用ダイアログ表示"""
dlg = wx.MessageDialog(self, message, "Debug", wx.OK)
dlg.ShowModal()
dlg.Destroy()
def get_charset(self, data):
"""文字コードを判別"""
f = lambda d, enc: d.decode(enc) and enc
try:
return f(data, 'utf-8')
except:
pass
try:
return f(data, 'shift-jis')
except:
pass
try:
return f(data, 'euc-jp')
except:
pass
try:
return f(data, 'iso2022-jp')
except:
pass
return None
app = wx.App()
Pyrompt(None, -1, 'Pyrompt ver.1.0')
app.MainLoop()


pybin/cd.py

import os
def execute(app, arg_list):
if arg_list:
if len(arg_list) == 1:
dirpath = arg_list[0]
if os.path.exists(dirpath):
os.chdir(dirpath)
message = "Moved to %s" % dirpath
return (0,message)
else:
message = "Directory does not exist: %s" % dirpath
return (1,message)
else:
message = "Incorrect argument: %s" % (",".join(arg_list))
return (1,message)
else:
message = "Destination dir required."
return (1,message)

Pythonで大容量ファイルを指定行数で複数ファイルに分割する

これもちょっと必要だったので作成したスクリプト。

秀丸gVimで開けないほど大きいファイルを分割する。

split_file.py

# -*- coding: utf-8 -*-

in_file_name = "verybigfile.log"
out_file_name_template = "splitted_%d.log"

max_lines = 1000000

split_index = 1
line_index = 1
out_file = open(out_file_name_template % (split_index,), "w")
in_file = open(in_file_name)
line = in_file.readline()
while line:
    if line_index > max_lines:
        print "Starting file: %d" % split_index
        out_file.close()
        split_index = split_index + 1
        line_index = 1
        out_file = open(out_file_name_template % (split_index,), "w")
    out_file.write(line)
    line_index = line_index + 1
    line = in_file.readline()

out_file.close()
in_file.close()