こんにちは、システム本部技術部たんぽぽGの森本です。

先日のmixi大規模障害の原因となったmemcachedの不具合の詳細な解明ができました。
再来週まで発表を見合わせようと思ったのですが、早くお伝えしたほうがいいと思いましたので公開発表致します。

memcachedとlibevent

memcachedはlibeventというライブラリを使用してクライアントからの要求(接続、コマンド送信)を処理しています。
libeventを使用するにはevent_baseという構造体を用います。

main threadはmain_baseを使用します。

static struct event_base *main_base;
...
int main (int argc, char **argv) {
...
    main_base = event_init();
...
    /* enter the event loop */
    event_base_loop(main_base, 0);
...
}

worker threadはthread毎に自前のevent_baseを使用します。

typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
} LIBEVENT_THREAD;

static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();

通常の動作では、main threadとworker threadは自分専用のevent_baseにのみアクセスするので排他制御は必要ありません。
main threadはクライアントからのaccept要求を処理します。
worker threadは接続済みのクライアントからの要求(コマンド実行)を処理します。

コネクションが一杯になった時の処理

memcachedはコネクションがいっぱいになると accept_new_cons(false) を呼び出して、接続要求の受付を停止します。

static void drive_machine(conn *c) {
...
    while (!stop) {
        switch(c->state) {
        case conn_listening:
            addrlen = sizeof(addr);
            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    /* these are transient, so don't log anything */
                    stop = true;
                } else if (errno == EMFILE) {
                    if (settings.verbose > 0)
                        fprintf(stderr, "Too many open connections\n");
                    accept_new_conns(false);
                    stop = true;
                } else {
                    perror("accept()");
                    stop = true;
                }
                break;
            }
...
}

void accept_new_conns(const bool do_accept) {
    pthread_mutex_lock(&conn_lock);
    do_accept_new_conns(do_accept);
    pthread_mutex_unlock(&conn_lock);
}

void do_accept_new_conns(const bool do_accept) {
    conn *next;

    for (next = listen_conn; next; next = next->next) {
        if (do_accept) {
            update_event(next, EV_READ | EV_PERSIST);
            if (listen(next->sfd, settings.backlog) != 0) {
                perror("listen");
            }
        }
        else {
            update_event(next, 0);
            if (listen(next->sfd, 0) != 0) {
                perror("listen");
            }
        }
    }
...
}

static bool update_event(conn *c, const int new_flags) {
    assert(c != NULL);

    struct event_base *base = c->event.ev_base;
    if (c->ev_flags == new_flags)
        return true;
    if (event_del(&c->event) == -1) return false;
    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = new_flags;
    if (event_add(&c->event, 0) == -1) return false;
    return true;
}

accept_new_cons(false)から最終的に呼ばれるevent_set()の第3引数は0なので、libeventはsfd(accept用ソケット)の監視を停止します。
またdo_accept_new_conns(false)でlisten(next->sfd, 0)としてソケットのlisten backlogを0にします。

コネクションがクローズされた時の処理

worker threadはクライアントとのソケットが切断されると conn_close(c) を呼び出します。

static void drive_machine(conn *c) {
...
    while (!stop) {

        switch(c->state) {
...
        case conn_closing:
            if (IS_UDP(c->transport))
                conn_cleanup(c);
            else
                conn_close(c);
            stop = true;
            break;
...
}

static void conn_close(conn *c) {
    assert(c != NULL);
...
    accept_new_conns(true);
...
}

クライアントとのソケットが切断されたということは、接続に1本余裕ができたはずです。
conn_closeからaccept_new_cons(true)を呼びだし、必要であればacceptを再開します。

void accept_new_conns(const bool do_accept) {
    pthread_mutex_lock(&conn_lock);
    do_accept_new_conns(do_accept);
    pthread_mutex_unlock(&conn_lock);
}

void do_accept_new_conns(const bool do_accept) {
    conn *next;

    for (next = listen_conn; next; next = next->next) {
        if (do_accept) {
            update_event(next, EV_READ | EV_PERSIST);
            if (listen(next->sfd, settings.backlog) != 0) {
                perror("listen");
            }
        }
        else {
            update_event(next, 0);
            if (listen(next->sfd, 0) != 0) {
                perror("listen");
            }
        }
    }
...
}

static bool update_event(conn *c, const int new_flags) {
    assert(c != NULL);

    struct event_base *base = c->event.ev_base;
    if (c->ev_flags == new_flags)
        return true;
    if (event_del(&c->event) == -1) return false;
    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = new_flags;
    if (event_add(&c->event, 0) == -1) return false;
    return true;
}

コネクションが一杯になった時と同じ do_accept_new_conns(),update_event()が呼び出されますが、さきほどとはフラグの値が異なっています。
update_eventの内部では他の処理とは異なり、worker thread が main_eventに変更をくわえます。
排他制御が崩れるとしたらこの部分が怪しいのですが、accept_new_conns()でmutexを使用した排他制御を行っていますし、update_event内部でも

    if (c->ev_flags == new_flags)
        return true;
...
    c->ev_flags = new_flags;

としているので、すでにaccept再開後は以降の処理が走らないようになっています。

どこで排他制御が崩れたか

今まで見てきた流れでは

  • worker thread間はmutexで排他制御されている。
  • accept中は、c->ev_flagが立っているためworker threadは以降の処理を実行しません。
  • accept停止中はmain threadはaccept用ソケットの監視を停止しているので、main threadはmain_eventに変更を加えません。

ときちんと排他制御ができているように見えます。

問題はdo_accept_new_conns()にありました。

void do_accept_new_conns(const bool do_accept) {
    conn *next;

    for (next = listen_conn; next; next = next->next) {
        if (do_accept) {
            update_event(next, EV_READ | EV_PERSIST);
            if (listen(next->sfd, settings.backlog) != 0) {
                perror("listen");
            }
        }
        else {
            update_event(next, 0);
            if (listen(next->sfd, 0) != 0) {
                perror("listen");
            }
        }
    }
...
}

do_accept_new_connsはlisten_connリストの要素でループしています。
memcachedは、初期化時にgetaddrinfo()を用いてaccept待ちをすべきネットワークI/Fを取得して、listen_connを作成します。
通常でもeth0とloの2つが取得されるので、ほとんどの場合2つ以上のconnがlist_connリストに登録されている状態になります。
※-lオプションにて指定することもできますが、その場合は一つのネットワークI/Fしか使用できません。

connの定義は以下の様になっていて、event構造体を含んでいます。

typedef struct conn conn;
struct conn {
    int    sfd;
...
    struct event event;
    short  ev_flags;
...
};

update_eventの内部で c->event

    struct event_base *base = c->event.ev_base;

と event_baseを取り出していますが、do_accept_new_connsから呼ばれた場合、この値は main_baseになります。
複数のconnが同じmain_baseを指している状況です。
main_baseが一つしか無いのはしょうがないのですが、複数のconnが個別のev_flagsを持ち、ev_flagsにて処理の制御を行っているにも関わらず、対象は一つのmain_baseであることが問題を引き起こしています。

つまり、

  1. クライアントがソケットを切断する。
  2. worker thread(以下 W):切断を検知してconn_close(c)を実行。
  3. W:do_accept_new_conns(true)を実行。
  4. W:list_connの要素でループを開始。
  5. W:一つ目のconnに対して
    1. W:update_event(next, EV_READ | EV_PERSIST)を実行。
    2. W:event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);を実行。
      この時点でmain threadがaccept用ソケットの監視を再開します。
      ここで排他制御が崩れました。
      これ以降、main threadはworker threadとは非同期にmain_baseに変更を加えます。
  6. W:二つ目のconnに対して
    1. W:update_event(next, EV_READ | EV_PERSIST)を実行。
    2. W:if (c->ev_flags == new_flags) の判定はありますが、今回は二つ目のconnなので c->ev_flagsは0のままです。
      returnせずに次の処理に進みます。

    3. W:event_add(&c->event, 0)を実行。
      event_addの内部で、処理待ちのevent数(event_count)をインクリメントしています。
      つまり、main threadが動いているにもかかわらずworker threadがmain_baseに操作を加えています。
      ここでタイミングよく(悪く?)main threadも同時にevent_addを行いevent_countのインクリメントを行うと、event_countの値が一つ少なくなってしまいます。
      処理待ちのeventがあるにも関わらず event_countが0となってしまい、main threadはevent_base_loop()を抜けてしまい、memcachedが終了します。
int
event_base_loop(struct event_base *base, int flags)
{
...
	while (!done) {
...
		/* If we have no events, we just exit */
		if (!event_haveevents(base)) {
			event_debug(("%s: no events registered.", __func__));
			return (1);
		}
...
	}

	event_debug(("%s: asked to terminate loop.", __func__));
	return (0);
}
int
event_haveevents(struct event_base *base)
{
	return (base->event_count > 0);
}

動作確認

上記の説明が正しいことを裏付けるために負荷テストを行いました。

memcachedの設定は

memcached -U 0 -u nobody -p 11222  -t 4 -m 16000 -C -c 1000 -l 192.168.0.1 -v

とネットワークI/Fを制限しました。
そこに3台のクライアントからmalaさんのテストスクリプトを用いて接続しました。

大量の”Too many open connections”を出しつづけながらも70時間(Fri 19:00-Mon 16:00と)以上落ちること無く動作しつづけました。

上記設定から-l オプションを外すと面白いぐらいにmemcachedが落ちまくります。

clock_handler

実はmemcachedにはtimerイベントも存在しており、accept中・accept停止などに関わらず常に動作しています。
低い確率ではありますが、このメソッド内でのevtimer_addによりevent_count値の不整合が起こる可能性もあります。

static void clock_handler(const int fd, const short which, void *arg) {
    struct timeval t = {.tv_sec = 1, .tv_usec = 0};
...
    evtimer_set(&clockevent, clock_handler, 0);
    event_base_set(main_base, &clockevent);
    evtimer_add(&clockevent, &t);

    set_current_time();
}

まとめ

multi threadのプログラムは書くのも大変ですが、再現とデバッグが大変だとつくづく実感しました。
Cでよくあるメモリ破壊が起こっていなかっただけマシだったのかもしれません。

patchの方針は、llameradaさんの方針が正しい方向だと思います。
clock_handlerでの不整合も治せますし。
この方針でmemcachedコミュニティと相談してみたいと思います。

こんにちは。システム本部技術部たんぽぽGの森本です

補足を追記しました (2010/08/20 15時)

先日のmixi大規模障害についての続報です
今回は小ネタはありません

はじめに

まず初めにtwitter/blogなどを通じて今回の問題の解析を行っていただいたみなさんに感謝の言葉を捧げたいと思います

  • kzk_moverさん
  • stanakaさん
  • mala(bulkneets)さん
  • llameradaさん

(順不同)
ありがとうございました
書き漏らした人ごめんなさい

memcachedはすごい

今回の件でmemcachedに対して不安感を持たれた方もおられるとお聞きしました
説明不足だったせいで誤解を与えてしまい申し訳ありません
きちんと設定および監視を行っていれば通常の使用にはまったく問題はありません

弊社にて -c 30万で起動したmemcachedに対して、先のテストスクリプトにて28万接続を頻繁にくりかえす負荷テストを行いました
19:00から翌朝の10:00まで何の問題もなく1行のエラーも吐くこともなく17時間動作したことを確認しました

詳細

今回のmemcached不具合の内容をまとめました

main thread

  • accept()で新しいクライアント接続を受け取るが失敗する
  • errno は EMFILE なので “Too many open connections” と判断する
  • 新しい接続要求を受け付けなくするためにaccept_new_conns(false); を呼び出し、listen()のbacklog数を0にする
  • update_event(next, 0)経由でevent_set()監視対象イベントも0にする

この状態だと main thread はaccept用socketのイベントは監視せず、timerイベントだけを受け取る状態になります

worker thread

  • クライアントが接続を切断したので conn_close() を呼び出して後始末を行う
  • 1本接続が減ったので新しい接続を受け付けられるはずなのでaccept_new_conns(true) を呼び出す
  • update_event(next, EV_READ | EV_PERSIST)経由で、main threadにaccept用socketのイベント監視を再開させる

ここで排他制御がうまくいかないと、処理待イベント数(event_count)が同時にインクリメントされ、event_countが実際の処理待ちイベント数よりも少ない数を保持することになります
その後 main threadがイベントを処理するとデクリメントされ、最終的にevent_count が0になり処理ループを抜けmemcachedが終了していました

accept_new_cons()自体はmutexを用いて排他制御が行われていましたが、それ以外の経路で main_base を操作する経路がいくつかありました
一つは clock_handler()、もう一つは accpetソケットに接続要求があった場合にlibeventが呼び出す epoll_dispatch()です
event_countに不整合が発生した場合の stack trace を取ったところ
event_base_loop => epoll_dispatch => event_queue_insert と呼び出されていました

再発防止策

memcached を -c 30万で設定しました
実際の接続数を監視してしきい値を超えるとアラートメールが飛ぶように設定しました
長期的にはアーキテクチャの変更が必要なので下記の項目について検討中です

  • サービス単位での個別のmemcachedを使用
  • memcached proxyの使用
  • UDPの使用
  • 不揮発なキャッシュシステム

まとめ

私見ですが、memcachedを運用する場合には -c による接続数の制限はできるだけ大きくするのが望ましいと思います
大事なのはmemcachedがきちんとサービスを提供できていることなので、CPU/Load/Memory/cache hit/response timeなどをきちんと監視していれば実際の接続数がいくらなのかは問題ではないかと思います
ちなみに現在 mixi では -c 30万で運用中です
-c を大きく設定するにはいくつかのカーネルパラメータの変更が必要です 詳しくはググってください
また、実際の接続数が大きくなるとネットワークバッファなどのリソースに影響がでますので、適宜監視が必要です

補足

「-c 30万は対症療法」「patchは書かないのか」とのご意見をいただきましたので若干の補足です

-c 30万

今回のmemcachedの不具合は単なる高負荷状態では発生せず、接続限界数に達したときのみ発生します
-c で設定した接続数に達して新しい接続が受けられなくなった時点でキャッシュしている値に不整合がでる可能性があるので、memcachedが落ちなくなったとしても、mixiというサービスとしては問題となります
以下のような場合にキャッシュ値に不整合がでます

  • ユーザーがmixiにアクセス
  • httpd Aがニックネーム(たろう)をMySQLから取得
  • httpd Aがmemcachedに保存
  • ユーザーがニックネーム(たろう => 太郎)を変更
  • httpd Bが新しいニックネーム(太郎)をMySQLに保存
  • httpd Bがmemcached上のニックネームを削除または更新
  • 接続限界数に達しているため処理が失敗する
    不整合発生

  • ユーザーがmixiにアクセス
  • httpd Aがニックネーム(たろう)をmemcachedから取得
    ※MySQL上のニックネームは太郎です

この様に一部のサーバーだけがmemcachedにつながった状態だとキャッシュしている値に不整合がでます
特に接続限界数に達するのが一時的なものであり、その後は正常に接続が行われる場合にはアラートにひっかかりにくく発見するのが困難になります
memcachedが完全に落ちた場合は自動的に再起動が行われるので一時的にMySQLの負荷が上昇するだけですみます

上記のようなことから、そもそも「接続数限界に達すること」自体が(memcachedのではなくmixiサービスを提供する上での)問題だと判断し、「-c 30万」および「十分に余裕を持たせた接続数の監視」をもって解決策としました

patch

memcachedが接続最大数に達した場合にどのように振る舞うのがよいのかを考え中です
現在の動作はlisten(fd,0)と接続待ち行列を0にしてクライアントからの接続要求を即座に拒否する作りになっています
しかし、この方法だと上で述べたようにキャッシュ値の不整合が起こる可能性があります
他の方法、例えば古い接続もしくはリクエストが来ていない接続を強制的に切断するほうが、性能は低下するもののキャッシュ値に不整合がでないのではないか、など悶々としております
このあたりはmemcached MLで相談してみようとおもいます

こんにちは。システム本部技術部たんぽぽGの森本です

先日のmixi大規模障害についてのブログです。
はじめにお断りしておきますが、弊社CTOがtwitterで公開した以上の情報はまだ得られておりません。
twitterでは書ききれなかった細部を補足してみたいと思います

現状判明しているのは以下の点です

  • memcachedに大量の接続・切断を行うとmemcachedプロセスが突然終了することがある
  • memcachedには異常時に終了するフローもあるが、同時に出力されるはずのエラーログは出ていなかった
  • coreも出力されていなかった

テスト環境にて追試を行ったところ、なんどか再現させることができましたが、確実に発生する条件は未だ不明です。

障害時の memcachedのバージョンは1.4.4, libeventのバージョンは1.3bです
memcached の起動オプションは以下のとおり

./memcached -vv -p 11222 -u nobody -m 16000 -c 40720 -C -U 0 -t 4 -b 4096

クライアントマシンは 3 台
各クライアントでは 5000 個 fork、1 プロセスでは最大 50 個コネクション生成。最大に達すると 1 つコネクションを再生成し get 命令を 1 つ発行。専用クライアントライブラリでは無く、ソケットを直接操作

クライアントコードは以下

#!/usr/local/bin/activeperl
 
use strict;
use warnings;
 
use IO::Socket::INET;
 
my $host = shift;
my $port = shift;
 
foreach my $sig (keys %SIG){
    next if($sig eq '__WARN__');
    next if($sig eq '__DIE__');
    next if($sig eq 'INT');
    next if($sig eq 'TERM');
    $SIG{$sig} = sub {
        my $sig = shift;
        print "Cought a sig[$sig]\n";
    };
}
 
for(1 .. 50*100){
    foo();
}
sleep 1 while(1);
 
sub foo {
    fork && return;
 
    my @sock;
    for (;;) {
        if(50 < @sock){
            my $nth = int(rand(@sock));
            my $s = splice(@sock, $nth, 1);
            print $s "get foo\n";
            my $ret = <$s>;
            close($s);
            next;
        }
        my $s = IO::Socket::INET->new(PeerAddr => $host, PeerPort => $port, Proto => 'tcp');
        if ($s) {
            print $s "get foo\n";
            my $ret = <$s>;
            print STDERR ".";
            push @sock, $s;
        } else {
            warn "XX [$$]Can't open port:[$!]\n";
            if (@sock) {
                my $nth = int(rand(@sock));
                my $s = splice(@sock, $nth, 1);
                print STDERR "close socket\n";
                print $s "get foo\n";
                my $ret = <$s>;
                close($s);
            } else {
                sleep 1;
            }
        }
    }
}

memcachedにいくつかdebug文を仕込んだところ、以下のことがわかりました。

  • memcached.c:main()にある event_base_loop から抜けていた
    通常はここで無限ループするはず
  • event_base_loop()中の event_haveevents(base) が false を返していた
  • event_haveevents()で base->event_count が0になったため return (base->event_count > 0); で false を返していた

コネクション最大状態で、open/close を激しく繰り返すと発生しやすい印象です。

5000プロセスとかなり負荷を掛けているつもりなのですが再現度はかなり低いです
もしかすると1台のクライアントで同時に行えるopen/closeはコア数が上限になってしまうなどの制限があるのでしょうか
引き続き確実な再現方法と原因究明に向けて調査を行っていきます

こんにちは。パートナーサービス部の加藤和良です。

前回、mixi における開発者テスト について説明しました。だいぶ間があいてしまいましたが、今回は、そのテストを定期的に実行する 継続的インテグレーション の仕組みを紹介したいと思います。

テストが遅い

実は、mixi のテストは「遅い」という大きな問題を抱えています。

Micheal Feathers は『レガシーコード改善ガイド』のなかで、単体テストが高速に実行できることの重要性を解き「単体テスト」を厳しく定義します。

次に当てはまるものは単体テストではない。

  1. データベースとやり取りする
  2. ネットワークを介した通信をする
  3. ファイルシステムにアクセスする
  4. 実行するために特別な環境設定を必要とする (環境設定ファイルの編集など)

上記に該当するテストが悪いというわけではない。多くの場合において、そのようなテストを書く価値はあり、しばしばテストハーネス内に記述される。しかし単体テストは、そのようなテストと切り分けて、変更を行うたびに高速で実行できるように保ち続けることが重要である。

この定義にしたがうと、mixi のテストで「単体テスト」と呼べるものはごくわずかで、それらは実際「変更を行うたびに高速で実行できる」速度に達していません。

そのためか mixi のテストは実行をさぼられがちで

  • 安定版からのブランチを作成して、開発を開始
  • 開発が一段落したので、そのブランチで (or マージしてから) テストを走らせる
  • テストが失敗するので調べてみたら、そもそも安定版のほうでもテストが失敗していた

といったことが時折ありました。

Buildbot

継続的インテグレーション導入の目的は、こういった状況を改善することでした。現在は、Subversion レポジトリ上の安定版のツリーにコミットが行われると、自動でテストが実行され、結果が IRC に通知されます。テストの結果は Web からも確認できます。

テストが失敗したときの IRC

これら一連の動作は Buildbot をつかって実現しています。Buildbot は Python で書かれた継続的インテグレーションのためのソフトウェアで、たとえば WebKit でも使われています。

継続的インテグレーションはテストを高速に実行するためのものではありません。ただ、テストが遅い状況でも、遅さに起因する問題をやや軽減させる効果はあるんじゃないかとは思います。

テストのアルファベット分割

mixi では継続的インテグレーションからのフィードバックをはやく得るために、2つの工夫をおこなっています。ひとつがテストのアルファベット順の分割です。

Buildbot では全行程がおわるまでの個々の処理を BuildStep と呼ばれる単位に分割します。BuildStep が成功すれば Web インターフェースで緑色に、失敗すれば赤色に表示されます。

mixi の場合、当初は

  • svn checkout
  • make test

と2つの BuildStep だけがあったのですが、この設定では make test の最中の進行状況がわかりにくいという問題がありました。前述のとおり mixi のテストは遅いので、わかりにくい時間もやや長めです。

そこでテストを何段階かに分割することを考えました。一番低レイヤーのライブラリから一番上のアプリケーションまで順に BuildStep に切る、みたいなのが真っ当なやりかただとは思うのですが、mixi のコードはお互いの依存関係がちょっと混迷を極めていて、あんまり良い分割単位が見出せません。

そこでいまは、単純かつ機械的に

  • ^t/lib/Mixi/A
  • ^t/lib/Mixi/B
  • それ以外

とファイル名をベースに分割するようにしています。

アルファベット順

アルファベットごとのテストの量には大分ばらつきがあり ^t/lib/Mixi/W みたいにマッチしないこともあります。ただ、テストの進行中および失敗した際のわかりやすさは大分改善されました。

最近変更した部分に対するテスト

もうひとつの工夫が「最近変更した部分に対するテスト」の別実行です。繰り返しになりますが mixi のテストは遅いため

  • Buildbot が全てのテストを実行し、失敗を報告
  • 失敗に対応する修正をコミット
  • Buildbot が再度全てのテストを実行し、成功を報告

というサイクルがあまり早く回せません。アルファベット分割も、例えていうなら「遅い処理の間はプログレスバーを出しましょう」という話でしかありません。

そこで mixi では、全テストの実行とは別に、最近修正されたソースコードに対するテストだけの実行も行っています。

最近変更した部分に対するテスト

現在は、1分ごとにレポジトリをポーリングして

full
コミットが10分間ないとすべてのテストを実行しはじめる
recent
コミットがあるごとに最近変更されたコードに対応するテストを実行する

という二系統のテストを (Buildbot 用語でいう別の Builder で) 行っています。

mixi には lib/Mixi/Member.pm へのテストは

  • t/lib/Mixi/Member.t
  • t/lib/Mixi/Member/ 以下

に置くというゆるやかな慣例があるため、変更されたファイルから実行すべきテストを探すのは比較的簡単です。もちろん、この方法では「ある変更が依存関係の彼方にあるコードを壊す」といった状況は検出できませんが、そこは full に任せています。

まとめ

というわけで、mixi における継続的インテグレーションの仕組みと、こまごました工夫について紹介しました。

説明の順番が、まず開発者テスト、次に継続的インテグレーション、というふうになってしまいましたが、実際には、このふたつは並行して進めていました。

継続的インテグレーションの存在は「テストを書いたけど実行されない」という問題をなくし、すでにテストを書いている人のリターンを増やします。また、Buildbot が IRC にテストの失敗を通知し、実際に本番でもバグがみつかる、という流れには、いままで「テストってなんだろう?」と思ってたひとが「書くと良さそうだ」となる効果があったんじゃないかと思います。

開発者テストが書きにくかったり、遅かったり、いまひとつ開発者のなかに浸透していなかったりする環境でも、とりあえず継続的インテグレーションからはじめてみるのはおすすめですよ、というのが伝われば幸いです。それではまた。

こんにちは。開発部最後の良心、mikioです。今回はLua処理系の並列化とそこでのKyoto Cabinetの利用法についてご紹介します。

サーバサイドスクリプティングといえばLua

Kyoto CabinetのLuaバインディングは後回しにしてKyoto Tyrant的なサーバの設計を進めていたのですが、やはりそのサーバにもスクリプティング機能を持たせたくなりました。つまり、サーバがデフォルトで提供する機能群だけでなく、ユーザがスクリプト言語で記述した任意の機能を追加して利用できるようにするということです。

Tokyo TyrantではLua拡張と呼ばれる機能を用いてそれを実現しています。サーバの起動時にLuaのスクリプトを記述したファイルを読み込ませて、そこで定義した関数をリモートから呼び出せるようにしています。そこで実行されるLuaの処理系にはTTが管理するデータベースを操作するためのオブジェクトが予め定義されているので、それを介して任意のデータ処理を行うことができます。

Kyoto Tyrant(仮称)の設計はまだまだ固まっていませんが、おそらくLuaを組み込むことになるでしょう。TTでは独自のDB接続用モジュールを書いていましたが、そのモジュールの構造はTCのLuaバインディングと酷似するものでした(というかほとんどコピペで作りました)。二重管理が嫌いな私としては、KTではKCのLuaバインディングそのものを呼び出す形でスクリプティング機能を実現したいと思います。

なぜLuaなのかという点については以前の記事でも述べましたが、言語仕様が小さくて覚えやすく、処理系も高速で軽量でリエントラントだからです。

Luaバインディング

先日、KCのLuaバインディングをリリースしました。TCのそれよりもさらに使いやすくなっています。KCのJava、Python、Ruby、Perlの各バインディングと同様に、言語共通のIDLに準拠するインターフェイスを備えるとともに、Lua言語に合わせたスタイルで操作できるようにもしています。具体的なAPIやサンプルについてはAPI文書をご覧ください。

Luaならではのインターフェイスと言えばクロージャとメタテーブルと汎用for文でしょうか。

require("kyotocabinet") -- KCのモジュールをロードする
kyotocabinet.import()   -- kyotocabinet.*をグローバル名前空間に取り込む

-- データベースを操作するためのコールバック関数
function dbproc(db)

   -- メタテーブルによってテーブルインターフェイスで書き込み
   db["foo"] = "hop"
   db["bar"] = "step"
   db[3] = "jump"

   -- クロージャによるトランザクションでレコードを更新
   function tranproc()
      db["foo"] = 2.71828
      return true
   end
   db:transaction(tranproc)

   -- Visitorパターン風にレコードを更新
   function mulproc(key, value)
      return tonumber(value) * 2
   end
   db:accept("foo", mulproc)

   -- 汎用for文でレコードを横断取得
   for key, value in db:pairs() do
      print(key .. ":" .. value)
   end

   -- Visitorパターン風に全レコードを更新
   function upproc(key, value)
      return string.upper(value)
   end
   db:iterate(upproc)

   -- 外部カーソルでレコードを横断取得
   function curproc(cur)
      cur:jump()
      function printproc(key, value)
         print(key .. ":" .. value)
         return Visitor.NOP
      end
      while cur:accept(printproc) do
         cur:step()
      end
   end
   db:cursor_process(curproc)

end

-- 上記関数をデータベースに適用
-- (openやcloseが不要なところがナイス)
DB:process(dbproc, "casket.kch")

Luaの実力

裏の仕組みは違えども、上記と同じような使い方はPythonやRubyやPerlでもできるので、Luaの利点として挙げるようなものではありません。というかミニマリズムに基づくLua言語の記述力は他の言語に比べると貧弱なのは否めず、言語としては個人的にはあんまり好きではありません。特に配列の添(ry

言語の機能やライブラリの拡充度からいくと他者に大きく水をあけられてはいますが、処理系の効率の良さがその欠点を補って余りあるというのがLuaの存在意義でしょう。特筆すべきはネイティブスレッドとの親和性が高いことです。

Luaはコルーチン(協調スレッド)と呼ばれる並行プログラミングのための機構を備えていますが、これは各スレッドが自分で実行権を手放すことによって「並行」処理を実現するための機構であり、「並列」処理つまり複数のCPUコアを同時に使って演算をする機構ではないので、ここでは言及しません。並列処理を行ってサービスのスループットを向上させるためにはあくまでネイティブスレッドを使う必要があります。

Lua処理系のAPIはいかなるグローバル変数も静的変数も使っておらず、完全にリエントラントです。その点はJavaも同じですが、Lua処理系のインスタンスはメモリ使用量も少ないし起動にかかるオーバーヘッドも小さいので、スレッド毎にLua処理系のインスタンスを割り当てるのにうってつけなのです。

サーバサイドスクリプティングでLuaを用いる場合、サーバのスレッドプールに存在するスレッドの上でLua処理系のインスタンスが動くことになります。その際に各スレッドに別個のLua処理系のインスタンスを割り当てると、Lua処理系に関してスレッド間のレースコンディションが起こり得ないので、排他制御に伴うオーバーヘッドを一切被ることなく、目的の処理を実行することができます。というより、Lua処理系はネイティブスレッドに対する排他制御機能を自身では全く持たないので、この方法がLuaによる唯一の並列化手法であると言えます。

しかし、Lua処理系のインスタンスをスレッド毎に割り当てると、スレッド間でデータを共有することができなくなるという欠点が生じます。ひとつのデータベースを複数のスレッドで同時に扱えないと意味がないので、それだと困ってしまいます。

そこで、KCのLuaバインディングでは、複数のLua処理系で同一のデータベースオブジェクトを共有する仕組みを提供することにしました。スレッド間で共有したいデータはすべてKCのデータベースに入れれば目的を達成できます。KC自体も並列性が高いのでそのようなデータ操作も並列に行うことができます。オンメモリDBを使えばファイルIOのオーバーヘッドを気にする必要はありませんし、逆にメモリに収まらないような規模のデータもファイルDBに収めることで共有と永続化ができます。また、オンメモリツリーDB(赤黒木)やファイルツリーDB(B+木)を使えば順序を管理できるので、リストやプライオリティキューとみなしてタスク管理に使うこともできます。

DBオブジェクトの共有

比較のために、まずは単一のLuaインスタンスのみでDBを作る方法について見てみます。

db = DB:new()
db:open("casket.kch", DB.OWRITER + DB.OCREATE)
db:set("japan", "tokyo")
db:set("england", "london")
db:set("germany", "berlin")
db:close()

DB:newメソッドを呼び出すと、その内部でC++のDBポインタが作られ、それをLuaから操作するためにLuaオブジェクト(Luaテーブル)としてラップしたものを返します。ここではそれをdbという変数で受けて、以降の操作ではそれを介してデータベースファイルを開いたりレコードを書き込んだりデータベースを閉じたりしています。

次に、複数のLuaインスタンスで単一のDBを共有する方法について見てみましょう。C++でネイティブスレッドを立てて、その各スレッドでLuaインスタンスを作って使うわけです。Lua層でなくC++層でDBポインタを作り、それを使いまわすのです。

// DBを作る
kyotocabinet::DB db;
db.open("casket.kch", DB.OWRITER | DB.OCREATE)

// スレッドクラス
class ThreadImpl : pubic kyotocabinet::Thread {
pubic:
  // コンストラクタでDBポインタを受け取る
  ThreadImpl(DB* db) : db_(db) {}
  // スレッドの処理内容
  void run() {
    // Luaインスタンスを作る
    lua_State* lua = luaL_newstate();
    // Luaの標準ライブラリを開く
    luaL_openlibs(lua);
    // DBポインタをLuaの軽量ユーザデータとしてラップ
    lua_pushlightuserdata(lua, db_);
    // 上記をグローバル変数としてエクスポート
    lua_setglobal(lua, "_db");
    // Luaスクリプトをロードする
    luaL_loadfile(lua, "myscript.lua");
    // 上記スクリプトを実行する
    lua_call(lua, 0, 0);
    // スレッドプールのワーカーとして働き、ジョブがあれば処理する
    while (alive) {
      Job job = get_job();                 // ジョブを取り出す
      lua_getglobal(lua, "do_job");        // Lua関数を取得
      lua_pushstring(lua, job.message());  // ジョブの内容を引数に指定
      lua_call(lua, 1, 0);                 // 上記関数を呼び出す
    }
    // Luaインスタンスを破棄する
    lua_close(lua);
  }
private:
  DB* db_;
};

// スレッドを作る
ThreadImpl t1(&db);
ThreadImpl t2(&db);

// スレッドを開始する
t1.start();
t2.start();

// イベントループ
while (alive) {
  Event ev = get_event();   // イベントを取り出す
  set_job(ev.job());        // ジョブを登録する
}

// スレッドの停止を待って回収
t1.join();
t2.join();

// DBを閉じる
db.close();

上記ではKCのスレッドライブラリを使っているおかげで、Java風にスレッド処理を定義することができています。Threadクラスを継承してrunメソッドをオーバーライドして、そうしてできたオブジェクトのstartメソッドを呼ぶというやつです。もちろん、別のスレッドライブラリを使ってもいいですし、POSIXスレッドやWin32スレッドのAPIを直接叩いてもいいです。

要点は、メインスレッドでDBを作って、そのポインタを個々のワーカスレッドに与えるということです。上記ではLuaの「_db」というグローバル変数としてそのポインタをLua側に渡しています。Lua側ではDBポインタをDBオブジェクトにラップして使います。また、上記ではワーカスレッドがジョブキューのコンシューマとして働くことを想定しており、各ジョブに対応してLuaの「do_job」という関数を呼ぶようにしています。となると、Lua側のスクリプトは以下のような内容になるでしょう。

-- 起動処理
db = DB:new(_db)

-- タスクを処理するために呼ばれる
function do_job(message)
  -- DBを使ってIDを採番して表示してみる
  local jobid = db:increment("jobid", 1)
  print(jobid, message)
end

Luaスクリプトはスレッドの起動時に読み込まれて実行されます。先頭で「_db」というグローバル変数からDBポインタを取り出して、DBクラスのコンストラクタに渡してDBオブジェクトを生成しています。そして、do_job関数を定義しておいて、その中でDBオブジェクトを操作しています。

実際の性能

Javaバインディング(Javaスレッド)とLuaバインディング(ネイティブスレッド)の性能比較をしてみました。KCのファイルハッシュデータベースに対して合計100万レコードの書き込みと読み込みをスレッド数を変えて行った場合にかかる時間を測定したものです。

Java Lua
1スレッド書き 6.903秒 2.073秒
1スレッド読み 6.649秒 2.026秒
2スレッド書き 4.372秒 1.455秒
2スレッド読み 3.711秒 1.105秒
4スレッド書き 2.880秒 1.154秒
4スレッド読み 2.287秒 0.818秒

JavaもLuaもスレッド数の増加に応じて所用時間が短くなって高速化していることがわかります。Luaは1スレッドのみでも4スレッドのJavaを凌駕する性能を持ち、そしてスレッド数を増やすとさらに高速化するということです。

Lua+ネイティブスレッドの構成だと少なくともユーザランドでは一切の排他制御をしないので、理論的にはCPUコア数を上限とするスレッド数に対して線形にスケールするはずです。実際にはDB層やOS層で多少のブレーキがかかるので線形とまではいかないわけですが、スレッドを増やすことで並列化の恩恵が体感レベルで受けられるということが今回の実験で確かめられたと思います。

まとめ

Web業界で生活しているとLua使いに出会うことは非常に稀なのですが、サーバサイドでもLuaは実用になる技術なのです。かく言う私もマイナー言語と思って優先度を下げていましたが、マルチコア・メニーコア時代にはLuaのような軽量な処理系をスレッド毎に割り当てるという手法も面白いんじゃないかなと思う次第です。C/C++で書いた高効率なサーバ実装にアドオンでき、Javaより高性能なスクリプト言語として機能するLua。最適化された世界に少しの柔軟性を持たせるLua。Lua最強説。

そして、複数のLuaインスタンスを並列させることによってインスタンス間でデータが共有できなくなる問題をKCは見事に解決してくれます。夢が広がりんぐですね。クラウドのトレンドには全く乗ってないプリミティブなアプローチではありますが、コア数の増加に対してスケールするようにすれば単一ノードの性能もまだまだ上げられます。KCはKVSのバックエンドとしてのみでなく、いわばDomain Specific Databaseのような実装を支援するツールとして育てていきたいと考えています。