時間のかかる前処理をDaskで高速化

最近仕事で自然言語処理を使ったモデルを作成していたんですが、 前処理やモデルを作る際に数分〜数十分単位の処理待ちの空き時間が頻繁に発生してしまい、 その度集中力が切れる問題に悩まされていました。

モデルの学習に時間がかかってしまうのはまた別の解決策を考えるとして、 今回は時間のかかる前処理をDaskをつかって高速化した方法をお話ししようと思います。

この記事は PyLadies Advent Calendar 2018 - Adventar の18日目の記事になります。


Daskとは?

Daskとは、numpyやpandasなどのデータを簡単に並列計算するライブラリ。

あまり公式のチュートリアルがわかりやすくない(気がする)ので、基本的な使い方は Python Dask で 並列 DataFrame 処理 - StatsFragments を見てみると良い。

サンプルデータ

今回はKaggleのToxic Comment Classification Challengeのデータを使ってテキストの前処理をしていく。

このコンテスト自体は、誹謗中傷などの6種の良くない表現が含まれているかどうかを当てるというもので、 約16万件の数百文字程度のテキストが入ったデータセットとなっている。

import pandas as pd

df = pd.read_csv('input/train.csv',)
print(f"{df.shape[0]:,d}行")
df.head()
159,571行
id comment_text toxic
0 0000997932d777bf Explanation\nWhy the edits made under my usern... 0
1 000103f0d9cfb60f D'aww! He matches this background colour I'm s... 0
2 000113f07ec002fd Hey man, I'm really not trying to edit war. It... 0
3 0001b41b1c6bb37e "\nMore\nI can't make any real suggestions on ... 0
4 0001d958c54c6e35 You, sir, are my hero. Any chance you remember... 0

通常の名詞抽出処理

今回はcomment_text列に対して、名詞のみを抜き出す前処理をすることを目的としていく。

nltkを使って、名詞を抜き出すような処理を定義する。

import nltk

def get_nouns(sentence):
    tokenized = nltk.word_tokenize(sentence)
    nouns = [word for (word, pos) in nltk.pos_tag(tokenized) if pos[:2] == 'NN'] 
    return nouns

comment_text列に対して上記関数を実行する。

df['comment_text'].head().apply(get_nouns)
0    [Explanation, edits, Hardcore, Metallica, Fan,...
1          [D'aww, colour, Thanks, talk, January, UTC]
2    [Hey, man, war, guy, information, edits, talk,...
3    [suggestions, improvement, section, statistics...
4                                 [hero, chance, page]
Name: comment_text, dtype: object

この処理を全データについてこの処理を実行すると11分かかった。
それほど長い時間ではないが、集中力が行方不明になるには十分な時間である。

Daskを使って処理

この名詞を抜き出す処理を、daskを使って書き換えてみよう。

Daskのインストールは pip install "dask[complete]" でできる。

まずはDataFrameをDaskオブジェクトに変換する。

import dask.dataframe as dd

# Daskに変換(npartitions:分割数)
ddf = dd.from_pandas(df, npartitions=10)

Daskオブジェクトに変換するには、npartitionsというオブジェクトを何分割して処理をするかを指定する引数を設定する必要がある。 基本的にはCPUのコア数までの好きな値を指定すれば良い。

CPUのコア数に設定する場合はいつも以下のように書いている。

import multiprocessing
ddf = dd.from_pandas(df, npartitions=multiprocessing.cpu_count())

ddf.divisionsをみてみると、DataFrameがどのindexで分割されているか確認することができる。

ddf.divisions
(0,
 6649,
 13298,
 19947,
 ...)

実際に名詞抽出関数を適用するコードは以下になる。

#  1行だけ元のSeriesに関数を適用させ、metaデータを作成する
meta = df['comment_text'].head(1).apply(get_nouns)

# metaを引数で指定して、普通に関数をapplyする準備をする
res = ddf['comment_text'].apply(get_nouns, meta=meta)

# 実際にapplyを各partitionに分けて実行し、結合する。
rtn = res.compute(scheduler='processes')

metaというのは指定した関数の返り値の各列がどんな型なのかを指定する引数。
詳しくは Internal Design — Dask 1.0.0 documentation あたりに書いてある。

列と型を指定したり、返り値と同じ型のオブジェクトによって指定することができる。

個人的には、関数を普通に元のDataFrameやSeriesの1行に適用して返ってきたものを指定するのが一番指定が楽という結論に至っているので、1行目の様な処理を行っている。

2行目Daskオブジェクトのapply関数を呼んだ時点では、まだapplyは実行されていない。 実際に処理を実行するのは、3行目のapplyの返り値に対してcomputeを実行したときになる。

今回スケジューラはprocessesにしている。
デフォルトはスレッドになっているが、基本的にはプロセスを指定しておけば良い。

スケジューラについては Scheduling — Dask 1.0.0 documentation で詳細を確認できる。

print(rtn.head())
0    [Explanation, edits, Hardcore, Metallica, Fan,...
1          [D'aww, colour, Thanks, talk, January, UTC]
2    [Hey, man, war, guy, information, edits, talk,...
3    [suggestions, improvement, section, statistics...
4                                 [hero, chance, page]
Name: comment_text, dtype: object

無事Seriesに対するapplyと同じ結果が得られた。
コードとしては3行増えたくらいで難しくはない。

実行速度を24コアのマシンでpandas, dask(npartitions=4), dask(npartitions=8), dask(npartitions=24)で比較した結果が以下になる。

npartitions=8で計算したときに6倍くらい早くなっている。
これなら1分半くらいの待ち時間ですむ。ギリギリ集中力が迷子になる前に終わりそうだ。

map_partitionsを使って少し複雑な処理

もう少し複雑な処理をしてみる。

以下の3つのキーワードのセットを用意し、各テキストがこの単語の組み合わせのいずれかを含んでいるかを調べてみる。

キーワードは以下のように定義する。

keywords = pd.Series([
    'you i', 'she i', 'he shit', 
])

以下のようなマッチしたかを調べる関数を準備する。

import functools
def is_match_keyword(keyword, text):
    target = set(text.lower().split(' '))
    parts  = set(keyword.split(' '))

    # partsに含まれていて、targetに含まれていない単語がなければTrue    
    return len(parts - target) == 0

def is_match_keywords(text_series, keyword_series):
    #  返り値となるSeriesを定義
    rtn = pd.Series(0, index=text_series.index)

    # キーワード一つずつにマッチするか確認
    for index, keyword in keyword_series.iteritems():
        # まだresultが0の行に対してのみis_match_keywordをapplyしていく
        result.loc[rtn == 0] |= text_series.loc[rtn == 0].apply(
            functools.partial(is_match_keyword, keyword))
    
    return rtn

先程と同じように、1行だけのデータに対して実行することで、meta情報を作成する、

meta = is_match_keywords(df['comment_text'].head(1), keyword_series=keywords)

今回はmap_partitionsという関数を使って計算してみる。

map_partitionsは分割したそれぞれのpartitionに対して普通にseries/dataframeが渡ってくるので、 関数内で普通にpandasのデータとして自由に扱える。

applyは複数列の情報を扱って処理するのには対応していないので、 複数列の情報を使った処理をしたい場合map_partitionsを使う。

# 実行する関数に引数を渡したい場合は、キーワード引数として渡す
res = ddf['comment_text'].map_partitions(is_match_keywords, keyword_series=keywords, meta=meta)

# 処理の実行
result = res.compute(scheduler='processes')
result.head()
0    False
1    False
2    False
3     True
4    False
dtype: bool

このような感じで、柔軟に普段pandasでしている処理を並列処理することができる。

個人的には、pandasで書いた処理をそのまま並列処理するにはmap_partitionsが使いやすいと思っている。


最後に

今回の記事のキーとなるのは以下の点です。

  • Daskのapplyやmap_partitionsを使えばいろんな処理が簡単に並列処理できる
  • Daskは返り値のmetaを指定しなければいけないけれど、1行だけ実行すると簡単にmeta情報を作れる

普段から処理を並列化することで、仕事の中の待ち時間を減らしていきましょう。

Daskに関する記事は調べてみるといくつかあるのですが、 簡単な計算処理をしているものが殆どで、実際に使うときの例が少ない印象だったので、今回の記事を書いてみました。

この記事でモデル作成中のみなさんの待ち時間を1分でも短くできたら嬉しいです :)

転職してデータサイエンティストからデータエンジニアになりました

前回の転職(ゲームプログラマからデータサイエンティストに転職しました - ぴよぴよ.py)から約1年半、再び転職しました。

ゲームプログラマ (2年)→ データサイエンティスト(1年)→ データエンジニア(new)

という感じのキャリアです。

前職の話

前職は職種的には「データサイエンティスト」をしていました。
受託の分析会社でお客さん先にいってミーティングしたり、分析したりする感じです。

いろんな案件を2並列くらいで数ヶ月くらいずつ参加し、テキスト情報の分類・タグ付け/DMや施策の最適化/DMP構築 などに関わりました。

私自身は基本的にアルゴリズムを考えたりツールを作ったりするのが好きなんですが、 データサイエンティストとして働いてみて、ゲームプログラマよりはデータサイエンティストの仕事のほうが楽しいなーと感じました。 競プロで培ったアルゴリズムが使えることも多々あって、嬉しかったです。

ただ、エンジニアリングしか学んでいなかったので、 転職当初はビジネス視点・社内/客前でのプレゼン力の無さには絶望しました。 限られた時間の中で、顧客が求めるものと乖離をしないものを作っていくためにはどうしたらいいのか、 というのを何度も何度も悩みました。

慣れない仕事も多かったですが、同年代のメンバーが多くたくさん友達ができて楽しかったです。

最後の方はマネージャとしてメンバーのアサインを決めたりなんかもしていて、マネジメントの難しさを少しだけ体感することができました。
しばらくはマネジメント側の仕事をする機会はなさそうだけど、次はメンバーとしてマネジメント側の立場も考えた上での振る舞いをしていけたらと思っています。

転職活動の話

仕事内容に関してはとても恵まれていたため、もう少し頑張りたい気持ちはありましたが、 今回は会社の方針と自分の考えの不一致で転職を決めました。

「行きたい会社がある」というベースの転職ではなかったので、 とにかく色んな会社の話を聞いて、面白そうなところをうけてみようという受け身なスタンスでした。

転職エージェントを使わなかったこともあり、

  • 自分に向いてる会社を自分で探す必要があるが、知らない会社にリーチしづらい
  • なかなか返信をいただけない企業に対して自分でステータス確認をする必要がある

という点は仕事しながら転職活動をする上で特に大変でした。

まずは受けてみたいと思えるポジションを探すため、自分の中の希望をとにかく書き出しました。
それから、それらの希望を

  • 本当に求めているもの
  • ただのわがまま
  • 我慢できること
  • やりたくないこと

に分類し、自分がどうしたいのか、ゆっくり向かい合いました。

その上で、

  • (MUST) アルゴリズムや実装が好き(&ちょっと得意)な自分を必要としてくれる
  • (MUST) フィーリングで自分がわくわくできるような会社
  • (BETTER) すでに得意なPythonもしくは(自分が書いてて楽しい)静的型付言語を利用できる

を満たす中で一番自分を高く買ってくれる会社に行きたいという方針に決めました。
かなりふわっとした要望なので、自分でポジションを探すのは難しく、 自分の性質をよく知っている友人たちに面白いポジションははないかと聞いて回りました。

10人ほどのお話を聞いた上で、 会社と内容にワクワクした以下の3つのお仕事のポジションを受けさせていただきました。

  1. ゴリゴリにサービス実装するお仕事(C++)
  2. 分析部門の1人目としてそこそこ大きい分析システムを構築するお仕事 (Python)
  3. AIをサービスに導入済みの会社で分析基盤構築(ミドルウェア開発)&分析をするお仕事 (Go, Python)

上記3つは仕事内容としてはそれぞれ全然違うため、 選んだポジションによって今後の自分の人生も結構かわるなーとワクワクしていました。

2と3のポジションに内定をいただき、迷った結果 3.の会社に行くことにしました。

このポジションを選んだ理由としては、

  • 分析のみに絞るよりもエンジニアリング要素があったほうが自分の強みを活かせると感じた
  • 面接で自分のスキルセットを確認していただくと同時に、丁寧な仕事内容の説明/チームメンバーのほとんどに会わせていただいたことで、面接の時点で自分がそのポジションで仕事をするイメージが明確にできた

という点が大きかったです。

新しい仕事の話

9/1に入社して、1週間たちました。

仕事内容としては上にも書いたようにGoでミドルウェア作ったりPythonでモデル作ったりそれをデプロイしたりする、分析屋/エンジニア/インフラを兼任するみたいな感じです。

分析に関わるエンジニアリングをまるっとやるということで、「データエンジニア」か「MLエンジニア」あたりで名乗っていこうと思います。

転職直後の様子はこんな感じです。

さいごに

転職活動にあたりポジションを紹介してくれた皆様、本当にありがとうございました!
いろんな会社があることを知れてとても面白かったです。

自分の経験を活かしつつ、チームに貢献できるよう頑張っていきたいと思います。

おまけ: データサイエンティストをする上で役に立った本

今まで技術書しか読んでなかったですが、 前職に転職してから、技術書以外の本でいろいろ本を読むようになりました。

他にもいろいろあって紹介する記事書きたかったんだけど、 もう書くこともなさそうなのでここでちょびっとだけ紹介しておきます。

世界最高のリーダー育成機関で幹部候補だけに教えられているプレゼンの基本

世界最高のリーダー育成機関で幹部候補だけに教えられているプレゼンの基本

先輩に「資料作るのに時間をかけすぎ」と言われ、すすめられた本で、 プレゼンの資料作りがめちゃくちゃ苦手な私のはとても勉強になった本でした。 これ読んで資料数個つくってから、資料作りのペースが半分くらいになった気がする。

イシューからはじめよ―知的生産の「シンプルな本質」

イシューからはじめよ―知的生産の「シンプルな本質」

王道の本ですが、仕事のススメ方を考える上でで自分の助けにもなったし、 社会人になりたての新卒に仕事の仕方を伝えるときの表現としてもつかえる内容の多い本でした。