> Super Technique 講座〜キュー(FIFO)

Super Technique 講座

キュー(FIFO)

キュー(FIFO)とは、データ構造から見た「パイプ」のことである。「パイプ」は言うまでもなく、UNIXユーザならばお馴染みのコマンドラインで使うアレである。標準入力を、別なプログラムの標準出力に繋ぎあわせて...という風にシェルレベルでは解説されるものだが、勿論プログラミングのレベルでもそれに相当するシステムコールがあるだけではなく、ライブラリとは無関係なプログラマが作るデータ構造としても重要である。ここでは、「キュー(FIFO)」という見地で広く全体像を捉えてみよう。


キュー(FIFO)とは?

そのそも FIFO とは、「First In, First Out」の略であり、「最初に入ったものが最初に出てくる」データ構造である。勿論FIFOはデータ構造のレベルで実装することができるのだが、便利なのでさまざまなライブラリなどの手段もある。

この FIFO は大変便利であるために、UNIXのいろいろな部分で利用されている。シェルにおけるパイプなどは一番判りやすい例だが、実は汎用的なプロセス間通信手段であり、この側面が非常に重要である。また、さまざまな割り込みハンドラ(ハードウェア割り込みからシグナルハンドラに至るまで)で取得されたデータを、一時保存しておく手段としても活用されているし、マルチスレッド・プログラミングでスレッド間でデータを受渡しする手段としても重要である(ここらへんについて知りたい人は「シグナルとコールバック」を参照のこと)。

ではこれほど重宝するデータ構造であるのだが、「なぜか」について考えてみよう。

  1. 「複数の」データを一時置いておくことができる。
  2. しかも、そのデータの順番を「保存」しておくことができる。
  3. うまく実装すれば、非同期な複数の入力がある場合に、それらが混ざり合わないようにうまく順序づけできる。

つまり、FIFO は、非同期に取得されるデータの交通整理ができるのである。これは大変重要な機能である。だから、次のようなかたちで、UNIXシステムとしての実装があり、ユーザは無意識的にこれの恩恵を受けているのである。

  1. プログラマがプログラム内部で作るデータ構造として。
  2. いわゆる「カーネルパイプ」として、カーネルの内部に確保されたパイプ用途のメモリ領域があり、これを経由して複数のプロセスがデータを送り合うことができる(複数のスレッドでも良いに決まっている)。これのインターフェイスが pipe(2) である。
  3. しかし、pipe(2) は低レベルのシステムコールであり、一般用途としてはやや使いにくい。そのため、ライブラリ関数として popen(3) が用意されており、system(3) のようなつもりでシェルサービスと共に利用できるようになっている。
  4. ファイルシステムとして実現されている FIFO。mknod(1) を使うことでファイルのようなかたちで作成することができ、これも当然プロセス間通信に利用できる。筆者が使っている Linux システムでは、次の FIFO ファイルがあるが、勿論アプリケーションの用途としてプログラマが作成することもありうる。
    /dev/initctl
    /sbin/init が利用するチャンネル。
    /proc/[プロセス番号]/maps
    現在実行中のプロセスの、いわゆる「メモリマッピング」を覗くことができる。どんな共有ライブラリを使っているのか、マル見えである。


データ構造としてのFIFO(連結リスト)

ではデータ構造として、プログラマが制作するFIFOを見てみよう。まず、一番単純なモデルは、連結リストで実装するものである。具体的に保存するデータは何でもよいのだが、まあ、ここでは int の値としておこう。当然ポインタに取ってやればどんなデータでも保持できる。データ構造は単なる連結リストである。

struct FifoItem {
   int val;               /* 値 */
   struct FifoItem *next; /* 次へのポインタ */
}; 

しかし、単なる連結リストとは使い方が違う。こういう風にする。

struct FifoItem Base;        /* ダミーのルート */
struct FifoItem *Rp = &Base; /* 読みだしポインタ(読み出す位置の一つ前を保持) */
struct FifoItem *Wp = &Base; /* 書き込みポインタ(追加位置を保持) */

/* キューへの追加処理。追加は末尾に。処理は簡単。 */
void put( int v )
{
   struct FifoItem *ret;

   ret = (struct FifoItem *)malloc(sizeof(struct FifoItem));
   if( ret == NULL ) {
      fprintf( stderr, "cannot malloc\n" );
      exit( 1 );
   }
   /* 新しいデータのセット */
   ret->next = NULL;
   ret->val = v;

   /* Wp の移動 */
   Wp->next = ret;
   Wp = ret;
}

/* キューからデータを取得し削除する処理。一応、キューが空ならば -1 を返す実装。
   値は参照渡しのポインタに入る */
/* 現実には Rp は読み出し位置の一つ前を保持している。 */
int get( int *v )
{
   struct FifoItem *tmp;  /* 削除作業用 */

   /* まず、値があるかどうかチェック */
   if( Rp->next == NULL ) {
       return -1;
   }

   tmp = Rp;        /* 削除作業用 */
   Rp = Rp->next;   /* ポインタを進める */
   *v = Rp->val;    /* 値の取得 */

   /* だから、今参照されたデータの1つ前のデータが解放される */
   if( tmp != &Base ) {
       free( tmp );
   }
   return 0;
}

だから、雰囲気的にはスタックに似ている。大きな違いは、スタックでは POP も PUSH も、共通の現在位置である SP によって行っているが、FIFO では読み/書きがそれぞれ独立のポインタ(Rp, Wp)で行うことにある。また、スタックでは最も「未処理」なデータ(要するに一番最初に入ったデータ)の位置は不変(==0)だが、FIFOでは最も「未処理」なデータ(この場合は一番最後に入ったデータ)の位置は、最後であるから不定である。このため、配列では実装がしにくく、今のように連結リストで実装するのがやりやすい(そりゃスタックだって連結リストで実装できるぞ)。まあ、適当に数値を入力し、もし改行のみならば1つづつ get() して表示するFIFOのドライバならこんなところである。

int main()
{
   char buff[256];
   int v;

   while( fgets(buff,255,fp) ) {
      if( buff[0] == '\n' ) {
          if( get( &v ) == -1 ) {
              printf( "FIFO is empty!\n" );
          } else {
              printf( "FIFO: value=%d\n", v );
          }
      } else {
          v = atoi(buff);
          put( v );
          printf( "FIFO: put %d\n", v );
      }
  }
}

この処理ではサボっているが、マルチスレッドに対応する場合には、これはかなりマズい。少なくとも put() の

   /* Wp の移動 */
   Wp->next = ret;
   Wp = ret;

および get() の

   /* まず、値があるかどうかチェック */
   if( Rp->next == NULL ) {
       return -1;
   }

   tmp = Rp;        /* 削除作業用 */
   Rp = Rp->next;   /* ポインタを進める */
   *v = Rp->val;    /* 値の取得 */

はいわゆる「クリティカル・セクション」になって、何が何でも一連の動作で行われなくてはならない(別スレッドが再入するとデータの一貫性が壊れる)部分である。マルチスレッドを想定する場合には、必ず mutex で保護する必要がある(Java なら synchronized メソッドあたりにするわけだが)。


データ構造としてのFIFO(リングバッファ)

しかし、今見た連結リストによるFIFOは、万能ではない。一般的にFIFOは「とりあえず」データを保持しておくのに使われるのだが、このウラには「今すぐに処理できない」ワケがあることが多い。この理由の部分でもっとも多いのは、データの取得がハードウェア割り込みハンドラやシグナルハンドラで行われるために、時間のかかる処理が出来ない、というケースなのである。

連結リストの場合には、本質的に malloc & free が行われる。だから、現実の上ではほぼ無限に近くデータを保持することができる。しかし、malloc & free 動作は、本質的に「高価な」処理である。出来ればシグナルハンドラで malloc & free することは避けたい、というのがプログラマの本能である。これはこういう問いを自分にすると良い。

この処理をしていて、malloc が出来なくて処理がアボートするのが現実的か、それともFIFO に溜るデータが多すぎるために、データを拾い落してしまうのが現実的か?

シグナルハンドラや割り込みハンドラの場合、「データを拾い落してしまう」方が現実的である。つまり、FIFOでデータを貯めておくバッファは有限(それもそんなに大きくない)であるべきである。

勿論、先程の連結リストの malloc に手を加えて、あらかじめ malloc() か配列で確保しておいたバッファを使う、というのもやり方である。しかし、この場合には next ポインタを保持しておく理由がなくなる。そこで登場するのが、「リングバッファ」である。

「リングバッファ」は配列の「使い方」である。配列は当然先頭と末尾がある、一次元のデータである。だから、次のデータにアクセスする時に添字を1つ増やすのだが、次の添字の計算に

    i = (i + 1) % sizeof(Array); /* i++ の代わりに.. */

としてやることで、末尾を越えたら先頭に戻るようにしてやることができる。つまり、あたかも配列が「輪っか」になっているように扱うのが、この「リングバッファ」である。これに対して、先程と同様に読みだしポインタ Rp と書き込みポインタ Wp を定義してやるのである。リングバッファによる FIFO は大体次のようなものになる。

#define RINGSIZE  5
int Ring[RINGSIZE];  /* リングバッファ */
int Rp = 0;          /* 読み込みポインタ */
int Wp = 0;          /* 書き込みポインタ */

void put( int val )
{
   int next = (Wp + 1) % RINGSIZE;
   if( next == Rp ) {
      fprintf( stderr, "リングバッファが満杯です。読まない限りデータを落します\n" );
      return;
   }
   Ring[Wp] = val;
   Wp = next;
}

int get( int *v )
{
   if( Rp != Wp ) {    /* リングバッファが空でない */
      *v = Ring[Rp];
      Rp = (Rp + 1) % RINGSIZE;
      return 0;
   } else {            /* リングバッファが空 */
      return -1;
   }
}

int isEmpty( )
{
   return  (Rp==Wp)?(1):(0);
}

この時、RINGSIZE よりも1つだけ小さな数しか配列にデータを保持できないロジックになっている。これは全部使ってしまうと、ロジックの上で、「Rp == Wp」がバッファが満杯の状態を表すのか、それともバッファが空の状態を表すのか、区別ができなくなる、という奥深い理由があるのである。今のケースでは「Wp == Rp」は常にバッファが空を示し、「(Wp + 1) % RINGSIZE == Rp」は常にバッファが満杯を示している。勿論フラグを立ててこの2つの状態を区別することも出来るが、バッファサイズより1つ少なく使う方が筆者は好きである。単純なロジックであるが、実は結構深いのである。

勿論、マルチスレッドで使う場合には、これら put(), get() は mutex で保護する必要があるし、シグナルハンドラでは割り込みを禁止して使う必要があることは言うまでもない(シグナルハンドラなら POSIX シグナルシステムコールのうちの sigprocmask(2) で保護すべきだし、デバイスドライバの割り込みハンドラならば cli(), sti() で保護するな)。

だから、割り込みハンドラの場合には、それがすべき処理を「短時間で出来る処理」と「どうしても長時間かかる処理」にうまく分割し、「短時間で出来る処理」を本来の割り込みハンドラとして実装し(いわゆる「トップハーフ」)、「長時間かかる処理」は通常のメインルーチンの流れの中で、適当なフラグによって処理ルーチンを起動するか、それともそれを処理するルーチンのスレッドのスケジューリングを上げてやって、処理ルーチンがそのうちされるようにする(いわゆる「ボトムハーフ」)、という二段階に分けて処理をする場合が多い。このトップハーフの処理では、FIFO に取得されたデータを put () し、ボトムハーフの処理で FIFO からデータを get() する、という具合に使われるのである。


pipe(2) の使い方〜双方向パイプ

さて、プログラマがFIFOを自前で用意してやることは、非常に有益な場合が多いが、それよりも一般アプリケーション・プログラマの場合には、他のプロセスとの間での通信手段としてFIFOを使うケースの方がずっと多い。このため、UNIXにはシステムコールとしてカーネルに準備された特別なメモリエリアである「カーネルパイプ」を利用して通信をするためのシステムコールと、ファイルシステムとして準備された FIFO を使うやり方と2通りがある。ここではまず、システムコールの使い方を解説しよう。

一番多いケースは、読み込み用パイプを作成し、何かの出力をするプログラムを起動して、その出力を取得する、という使い方である。この時、そのプログラムが望み通り動作するためのデータは起動引数で与えることになる。この用途ではわざわざシステムコールを使うよりも、system(3) の要領でシェルによるコマンドライン解析を任せて、サブシェルとして起動するライブラリである popen(3) が使える。これの使い方はカンタンだ。

現在、動作中のプロセスのプロセス番号をすべて表示するプログラムを書いてみよう。popen(3) を使って「ps -ax」を実行し、その出力を通常のファイル入力であるかのように処理できる。

#include <stdio.h>

int main()
{
   FILE *fp;
   char buff[256];

   if( (fp = popen( "ps -ax", "r" )) == NULL ) {
       fprintf( stderr, "cannot invoke ps!\n" );
       exit( 1 );
   }
   fgets( buff, 255, fp );  /* 最初1行はタイトルのなので読み捨てる */
   printf( "Now running Process are :\n" );
   while( fgets( buff, 255, fp ) ) {
       int pid = atoi(buff);
       printf( "%d\t", pid );
   }
   printf( "\n" );
   pclose( fp );  /* close処理だけ、pclose(3) である */
}

事実上、このやり方が一番移植性のある「すべてのプロセス番号取得」プログラムである(ps コマンドのフラグにはちょっと注意が必要)。現実には「すべてのプロセス番号を取得する」システムコールがUNIXにはないので、もしこれでやらなければOS固有のやり方(Linux だと /proc のディレクトリ内容をスキャンして数値のディレクトリ名を拾う)をしなくてはならなくなる。これは大変バカバカしく、しかも移植性を欠く。

この逆方向、つまり「入力としてパイプで与える」というのも、popen(3) で出来る。たとえば、/usr/bin/passwd あたりを起動してパスワード変更をする、というような用途があるわけだ。しかし、両方は popen(3) では出来ない。つまり、入力・出力ともパイプに繋いで、入力をパイプに流し込んで起動したプログラムに処理させ、結果を別なパイプで取得する、というのは popen(3) では出来ないのである。これをするには、より低レベルに降りていかなくてはならない。つまり、popen(3) が使っているシステムコールである、pipe(2) を直接使わなくてはならないのである。

パイプ実行の解説に当たって、以下の事実を指摘しておく。

  1. サブプロセスを起動するためには、その前提として fork(2) することが必要である。つまり、プロセスの子プロセスとして、自分のコピーを作り、子プロセスが exec(2) システムコールのインターフェイスのどれかを発行して、そのプログラムのプロセスに変身する必要がある。
  2. fork(2) の効果の一つとして、親プロセスが開いていたファイルデスクリプタはすべて子プロセスでも有効である、という事実がある。つまり、親プロセスのファイルデスクリプタを管理する、プロセスに付属するテーブルの内容(具体的なデスクリプタの状態)は、親と子で共有されているのである(テーブルエントリ自体は変化しうる)。だから、普通は子プロセスの側で親が開いたファイルを勝手にクローズすることはない。
  3. 子プロセスが exec(2) しようとも、ファイルデスクリプタテーブルの状態は特に変化しない。つまり、fork(2) した後で、適切に標準入出力を変更すれば、その結果は exec(2) された任意のプログラムに引き継がれる。

さて、以上の事実を基にすれば、双方向パイプを実現できる。そのキーとなるシステムコールは次のものである(双方とも unistd.h にある)。

int pipe(int filedes[2]);
これはパイプを作成するシステムコールであり、2つのファイルデスクリプタを返す。filedes[0] は読み込み用、filedes[1] は書き込み用である。だから、これらの2つをそれぞれ親プロセス、子プロセスの担当として通信する。だから、双方向パイプの場合には、2回 pipe(2) を発行して2組(4つのデスクリプタ)が必要になる。
int dup2(int oldfd, int newfd);
子プロセスは exec(2) されたプログラムの標準入出力を「ダマさなくては」ならない。起動されたプログラムの側では標準入出力に読み書きしているのだが、実はこれはパイプへの読み書きになっている... というのが求めるところである。だから、ファイルデスクリプタの 0(標準入力)と 1 (標準出力) を強制的にパイプのものに割り当てなくてはならない。これに使うのが dup2(2) である。つまり、
    int desc[2];
    if( pipe( desc ) == -1 ) {
        /* エラー処理 */
    }
    if( fork() == 0 ) {  /* 子プロセス側 */
       dup2( desc[1], 1 );  /* 以降、標準出力はパイプに繋がる */
       system( "ps -ax" );
というのが、実際には popen(3) のしていたことなのである。

では、本題の双方向パイプのサンプルを出そう。このサンプルでは起動第1引数に実行したい任意のパイプを指定する。そして、標準入力を受け付けて、各行ごとにパイプを実行し、その結果を表示する。つまり、

% ./filter 'wc'
dadsfadfadf doahf;ie  kajfa p asldjfak jp 
      1       6      43
asdfadf
      1       1       8

の要領で動作する。子プロセスとして起動しているのは /bin/sh なので、第1引数としてパイプを含もうと引数を含もうと一切関係がない。

% ./filter 'cat | wc -c'
dafda dadfae dva
     17
dafa d
      7
#include <stdio.h>
#include <unistd.h>

int do_pipe( char *command, char *content, int conlen )
{
     int p1[2], p2[2];  /* 2組のデスクリプタ */
     int ret;           /* 戻り値 */

     /* パイプの作成 */
     if( pipe( p1 ) == -1 ){  /* 0=子読み、1=親書き */
          printf( "cannot open pipe\n" );
          return -1;
     }
     if( pipe( p2 ) == -1 ){  /* 0=親読み、1=子書き */
          printf( "cannot open pipe\n" );
          return -1;
     }
     ret = 0;
     if( fork() ){   /* 親側 */
          close( p1[0] ); close( p2[1] );  /* 不要なデスクリプタは閉じる */
          /* パイプに書き込む */
          if( write( p1[1], content, strlen(content) ) <= 0 ){
               close( p1[1] ); close( p2[0] );
               fprintf( stderr, "pipe write error\n" );
               return -1;
          }
          close( p1[1] );  /* 書き込み終了を告げる */
          wait( &ret );    /* 子プロセスの終り待ち */
          if( ret == 0 ){  /* 正常終了 */
               int len;
               len = read( p2[0], content, conlen ); /* 結果読み込み */
               content[len] = '\0';
          } else {
               ret = -1;
          }
          close( p2[0] );

     } else {        /* 子側 */
          close( p1[1] ); close( p2[0] );  /* 不要なデスクリプタは閉じる */
          close( 1 );        /* 標準出力を閉じ */
          dup2( p2[1], 1 );  /* パイプの出力につなぐ */
          close( 0 );        /* 標準入力を閉じ */
          dup2( p1[0], 0 );  /* パイプの入力につなぐ */
          /* /bin/sh の実行 */
          execl( "/bin/sh", "sh", "-c", command, NULL );

          /* 以降のコードは exec(2) がエラーでない限り実行されない */
          fprintf( stderr, "load program /bin/sh\n" );
          perror("");
          exit(1);
     }
     return( ret );
}


int main( int argc, char **argv )
{
     char buff[256];

     if( argc != 2 ) {
          fprintf( stderr, "fileter command\n" );
          exit( 1 );
     }
     while( fgets( buff, 255, stdin ) ) {
          do_pipe( argv[1], buff, 256 );
          printf( "%s", buff );
     }
}

とはいえ、行指向プログラムなのに read(2), write(2) を使うのが気分が悪い方もおろう。それならば、次の手が使える。fdopen(3) である。

#include <stdio.h>

FILE *fdopen (int fildes, const char *mode);

このライブラリ関数は、第一引数のファイルデスクリプタから、FILE 構造体を再構築する。これによって stdio.h 風の処理ができるようになる。親処理の部分だけ示す。

     if( fork() ){  /* 親側 */
          FILE *fp1, *fp2;
          close( p1[0] ); close( p2[1] );

          /* fdopen(3) でFILE構造体を作る */
          if( (fp1 = fdopen( p1[1], "w" )) == NULL ) {
               fprintf( stderr, "cannot fdopen(3)!\n" );
               return -1;
          }
          if( (fp2 = fdopen( p2[0], "r" )) == NULL ) {
               fprintf( stderr, "cannot fdopen(3)!\n" );
               return -1;
          }

          /* だったら fprintf(3) だって使える */
          if( fprintf( fp1, "%s", content ) < 0 ){
               fclose( fp1 ); fclose( fp2 );
               fprintf( stderr, "pipe write error\n" );
               return -1;
          }
          fclose( fp1 );  /* 当然 fclose(3) で閉じる */
          wait( &ret );
          if( ret == 0 ){
               /* 当然 fgets(3) だって使える */
               fgets( content, conlen, fp2 );  
          } else {
               ret = -1;
          }
          fclose( fp2 );

まあ、これは趣味と用途に応じてだろう。


名前付きパイプ

さて、カーネルパイプと比較すると、それほど活用されているという感じがしない「名前付きパイプ」(ファイルシステムの上のパイプ)である。しかし、これは親子関係にない(あるいは親子関係を「忘れた」)プロセスの間でデータを交換するのに非常に便利なものである。それだけではない。ファイルシステム固有の排他制御によって、名前付きパイプも、複数のプロセス(スレッド)によって同時に書き込みがなされようとした時に、一方をブロックする。だから、適切に書かれていさえすれば、特に mutex に気を配らなくても、同時の書き込みが混じり合うことはない。これは大変便利なため、スレッドプログラミングでも活用すべきだろう。

名前付きパイプは mknod(1) で(あるいはプログラム上では mknod(2) で)作成する。次の通り。

% mknod fifo p

第1引数がファイル名、第2引数の p が名前付きパイプを示している。ls -l で見てみると、次のように表示する。

% ls -l fifo
prw-r--r--   1 sug     users            0 Sep 28 00:57 fifo|

実験してみよう。3つの端末を開いておく。2つは入力、1つは出力である。出力側は「cat fifo」とでもしておいて、ほっておく。2つの入力端末からは「cat >fifo」の要領で書き込んでいくのである。そうすると、両方とも書き込んだ時に、それが出力端末に表示されることを確認しよう。これは要するに、write(2) がスレッドセーフなシステムコールであるために、write(2) の単位では絶対に混じり合わないのである。だから、write(2) で正しい単位を定め(1行とか、何かのサイズが一定の構造体単位とか)、その単位で書き込んで、読み側はその単位で読み込めば普通のファイルオープン処理によって、FIFO が手に入るのである。勿論、1つのプロセスがすべての内容を書き終るまでロックすることもできる。それは通常のファイルシステム操作と同様で、flock(2) によってロックをすれば良いだけである。例えば、複数のファイルをオープンし、ファイル単位でロックをしながらFIFOに書き込む処理だと次のようになる。

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/file.h>

int fifo_proc( char *file )
{
     FILE *fp;
     int pipe;
     char buff[256];
     char title[512];
     int len;

     /* パイプのオープン。念のためシステムコールで。 */
     if( (pipe = open( "./fifo", O_RDWR )) == -1 ) {
          fprintf( stderr, "cannot open FIFO\n" );
          return -1;
     }
     flock( pipe, LOCK_EX );  /* ファイルロック */

     /* ターゲットファイルのオープン */
     if( (fp = fopen( file, "r" )) == NULL ) {
          fprintf( stderr, "cannot open %s\n", file );
          return -1;
     }

     /* タイトルとしてファイル名をを入れておく */
     sprintf( title, "######## %s #########\n", file );
     write( pipe, title, strlen(title) );

     /* 各行処理 */
     while( (len = fread( buff, 1, sizeof(buff), fp )) > 0 ) {
          write( pipe, buff, len );
     }
     fclose( fp );
     close( pipe );  /* close(2) でロックは自動的に外れる */
}

int main( int argc, char **argv )
{
     while( *++argv ) {
          if( fifo_proc( *argv ) == -1 ) {
               return 1;
          }
          sleep( 1 );  /* まあ、これは入れずにやるとかいろいろやってみて! */
     }
     return 0;
}

これならば、各ファイルは混じり合わずに出力される。読み側が読み込みをサボると、書き出しがブロックされて抑制される動作をする。当然これは同一プロセスのスレッド間の動作でも問題がないのである。まあ、ホントは POSIX で fcntl(2) についてファイルロックの仕様が決められたために、flock(2) ではなくて、fcntl(2) を使うべきなのだが、許して欲しい。



copyright by K.Sugiura, 1996-2006