James君!〜パイプライン再説

疑惑のパイプライン

さて、ここまで実戦的にいろいろと見てきたわけだが、<spoolmanager> 部の Mailet パイプラインについて、そろそろ疑問が湧いてくる...というのがプログラマというものではなかろうか。なのでここでそういう疑問を解決しちゃう。たとえば、

  1. [終了]Mailet じゃなくて、パイプラインが尽きたらどうなるの??
  2. Matcher が Recipient を変更した場合、その変更は他の Matcher まで及ないよね?
  3. Matcher の NOT とか AND とか OR ってどうやるの?
  4. Matcher のオプションに別な Matcher を取る「高階 Matcher」みたいなことって出来るの?

というような疑問が湧いてきていない? まあ、ここらへんを解決するためには、「Use the Source, Luke!」でやっていこう。実際には SpoolManager は James 本体とは別スレッドで動いており、要するに james-2.2.0/apps/james/var/mail/spool にメール・オブジェクトが保存されると、それをトリガにして動き出す仕掛けである。だから結構独立性が高くて、他のブロックの情報を手に入れるのが少々厄介...というあたり問題がないわけではない。

SpoolManger

で、SpoolManager の本体のクラスは、org.apache.james.transport.JamesSpoolManager である。まあこのクラスは Runnable interface を implements していることから判るように、スレッドだ。でやはり Thread.run() メソッドが実際に「spool を監視して、そこにオブジェクトが保存されたらパイプラインをドライブする」という機能を実現している。まあ、このあたりを大まかに書くと、

    public void run() {
        numActive++;
        while(active) {
            String key = null;
            try {
	        // メールをスプールから得る。なければここで待つ
                MailImpl mail = (MailImpl)spool.accept();
                key = mail.getName();
                process(mail);
                if ((Mail.GHOST.equals(mail.getState())) ||
                    (mail.getRecipients() == null) ||
                    (mail.getRecipients().size() == 0)) {
		    // 終了条件ならばここでスプールから削除
                    spool.remove(key);
                } else {
                    spool.unlock(key);
                }
                mail = null;
            } catch (InterruptedException ie) {
            } catch (Throwable e) {
	      /* エラー処理 */
            }
        }
        numActive--;
    }

    protected void process(MailImpl mail) {
        while (true) {
	    // 現在のプロセッサ名を得る
            String processorName = mail.getState();
            if (processorName.equals(Mail.GHOST)) {
                return;
            }
            try {
	        // プロセッサ名からそのプロセッサを取得する
                LinearProcessor processor
                    = (LinearProcessor)processors.get(processorName);
		// プロセッサに処理をさせる
                processor.service(mail);
                return;
            } catch (Throwable e) {
	        /* エラー処理 */
            }
        }
    }

という処理になっている。要するに処理中はずっとスプールにオブジェクトがあり、その中に「次のプロセッサ」が Mail.getState() として保存され続けるわけである。だから、setState( Mail.GHOST ); と [終了] させたのは、「次のプロセッサはないよ」とここに伝えるためにやっていたことなのである。

LinearProcessor

要するに JamesSpoolManager はパイプラインのドライバに相当する部分だ、ということが判ったことと思う。次は「個々のパイプライン」に相当するクラス、org.apache.james.transport.LinearProcessor である。要するにこれは <processor> タグに相当するクラスである。

    public void service(MailImpl mail) throws MessagingException {
        /* mail をパイプラインの先頭に入れておく */
        List[] unprocessed = new List[matchers.size() + 1];
        for (int i = 0; i < unprocessed.length; i++) {
            unprocessed[i] = new ArrayList();
        }
        unprocessed[0].add(mail);
        String originalState = mail.getState();

        mail = null;
        int i = 0;
	/* パイプラインに相当するループ */
        while (true) {
            unprocessed[unprocessed.length - 1].clear();
            mail = null;
	    /* 処理すべきメールを取りだす */
            for (i = 0; i < unprocessed.length; i++) {
                if (unprocessed[i].size() > 0) {
                    mail = (MailImpl)unprocessed[i].remove(0);
                    break;
                }
            }

            if (mail == null) {
                return;
            }

            Collection recipients = null;
	    // Matcher を取り出す
            Matcher matcher = (Matcher) matchers.get(i);
	    /* 実は matchers, mailets の最後の要素は、初期化で内緒で
	    追加された、terminatingMatcher,terminationMailet である。
	    terminationMailet はログに「パイプラインが終った」旨を出して、
	    GHOST して終る、というだけの処理である。ここらへん、
	    LinearProcessor#closeProcessorLists()
	    JamesSpoolManager#initialize()
	    を参照してもらいたい。*/
            try {
	        // Matcher.match() の呼び出し
                recipients = matcher.match(mail);
                if (recipients == null) {
                    recipients = new ArrayList(0);
                } else if (recipients != mail.getRecipients()) {
                    verifyMailAddresses(recipients);
                }
            } catch (MessagingException me) {
	        /* エラー処理 */
            }

	    // さてここの処理重要!
            Collection notRecipients;
            if (recipients == mail.getRecipients() || recipients.size() == 0) {
                notRecipients = new ArrayList(0);
            } else {
	        // Recipient が編集されている
                notRecipients = new ArrayList(mail.getRecipients());
                notRecipients.removeAll(recipients);
		// 返されなかった Recipient だけが notRecipients に入っている
            }

            if (recipients.size() == 0) {
	        // Matcher が「処理対象である」と判定しなければ、
		// ループの次のための用意をして、continue
                unprocessed[i + 1].add(mail);
                continue;
            }

            if (notRecipients.size() != 0) {
	        // Recipient が編集されているなら、分身の術を使う。
		// Mail をコピーして
                MailImpl notMail = (MailImpl)mail.duplicate(newName(mail));
		// パイプラインの「次」のために用意する
                notMail.setRecipients(notRecipients);
                unprocessed[i + 1].add(notMail);
		// このMatcher で一致した Reciepient をこのメールに保存する
                mail.setRecipients(recipients);
            }

	    // Mailet を取得し、
            Mailet mailet = (Mailet) mailets.get(i);
            try {
	        // Mailet を実行する
                mailet.service(mail);
                verifyMailAddresses(mail.getRecipients());
            } catch (MessagingException me) {
	        /* エラー処理 */
            }

            if (!mail.getState().equals(originalState)) {
	        // GHOST なら終了
                if (mail.getState().equals(Mail.GHOST)) {
                    mail = null;
                    continue;
                }
		// プロセッサが変わっている
		// なら、spool に保存して、別な LinearProcessor が起動して
		// これを処理することになる。
		// 不一致 Recipient があるかもしれないので、処理は続ける
                spool.store(mail);
                mail = null;
                continue;
            } else {
	        // 次の Mailet のために準備
                unprocessed[i + 1].add(mail);
            }
        }
    }

...というわけである。疑問の、

  1. [終了]Mailet じゃなくて、パイプラインが尽きたらどうなるの??
  2. Matcher が Recipient を変更した場合、その変更は他の Matcher まで及ぶの?

にはこれで答えたことになるよね。要するに、

  1. GHOST されてないメールがパイプライン末尾に到達したら、メールを GHOST して spoolmanager-*.log に「Message Mail**** reached the end of this processor, and is automatically deleted. This may indicate a configuration error.」というメッセージを吐くだけの Mailet が実行されて、メールは消える。
  2. Matcher が Recipient を変更した場合には、そのメールが2つに「分身」し、以降それぞれ別々なメールとして処理を継続していく。

ということだ。これをテストするんだと、

$ telnet localhost 1025
HELO localhost
250 bizet Hello localhost (localhost [127.0.0.1])
MAIL FROM: <hoge@hoge.com>
250 Sender <hoge@hoge.com> OK
RCPT TO: <sug@localhost>
250 Recipient <sug@localhost> OK
RCPT TO: <hogehoge@localhost>
250 Recipient <hogehoge@localhost> OK
DATA
354 Ok Send data ending with <CRLF>.<CRLF>
Subject: Hello! James!

Hello, James.
.
250 Message received

というように「RCPT TO:」を複数行書いてやれば実験できる。



copyright by K.Sugiura, 1996-2006