Hadoop本 6章 MapReduceの動作 の疑問点や気になる点について記述してください。
※記入者、該当ページ・該当行は忘れずに書いて下さい。

「ジョブ」と「タスク」の違い

  • [記入者] terurou
  • [該当箇所] 169〜190ページ(6章全般)
  • いわゆるバッチ処理で言われるジョブ・タスクと同等
  • ジョブ:コンピュータが処理する仕事の単位・プログラム群。HadoopではMapReduceプログラムを指す。
  • タスク:ジョブを構成するプログラム。HadoopではMapタスク・Reduceタスクなど、MapReduceプログラムの処理の一部を指す。

「MapReduceの裏側」の登場人物

  • [記入者] terurou
  • [該当箇所] 169〜175ページ(6.1 MapReuceジョブの実行の裏側)
  • クライアント
    • jobtrackerに対してジョブの投入
  • jobtracker
    • ジョブの実行管理(タスクのスケジューリング、進行状況管理など)
    • 単一障害点(SPOF)
  • tasktracker(群)
    • タスクの実行
  • 共有ファイルシステム(分散ファイルシステム)
    • HDFSなど
    • ジョブのリソースを共有

「MapReduceの裏側」のフロー

  • [記入者] terurou
  • [該当箇所] 169〜175ページ(6.1 MapReuceジョブの実行の裏側)
  1. ジョブの投入
    1. 新規ジョブIDの取得(クライアント→jobtracker)
    2. 入力スプリットの計算
    3. 取得したジョブIDにちなんだディレクトリにジョブのリソースをコピー(クライアント→共有ファイルシステム)
    4. ジョブ投入以後は1秒ごとにjobtrackerから最新の状況を問い合わせる
  2. ジョブの初期化
    1. スケジューラキューへの登録
    2. 入力スプリットごとにmapタスクを生成
    3. mapred.reduce.tasks で設定された数のreduceタスクを生成
  3. タスクの割り当て
    1. ハートビートでタスクが実行可能であることを通知する(tasktracker→jobtracker)
    2. スケジューリングアルゴリズムによってタスク実行ノードを確定、割り当て
      • 6.3 でスケジューリングアルゴリズムについて解説あり
  1. タスクの実行
    1. タスク同士を隔離するため、タスクを実行する度にJVM(プロセス)を起動
      • JVMの起動には1秒程度のオーバーヘッドがかかり、JIT(HotSpot JVM)も効きづらい
      • 6.5.2 でJVMの再利用について解説あり
  1. 進行状況とステータスの確認
    1. タスクは3秒ごとにtasktrackerへ情報を通知
    2. tasktrackerは5秒ごとにjobtrackerへハートビートを使って情報を通知
  2. ジョブの完了処理
    1. 最後のタスクが完了したら、jobtrackerはステータスを successful にする

データローカル?

  • [記入者] terurou
  • [該当箇所] 172ページ(6.1.3 タスクの割り当て)
  • 「スプリットが置かれているノード上でタスクが実行される」?
  • 「HDFSとtacktrackerを同一ノードで動作させている」状態のこと?

タスクの障害

  • [記入者] terurou
  • [該当箇所] 176〜177ページ(6.2.1 タスクの障害)
  • タスクが一定回数(デフォルトは4回)以上失敗した場合、ジョブ全体が失敗となる
  • 設定により、全タスクのうちの数%が失敗しても、ジョブを中断させずに結果を出力させることも可能
  • タスク失敗とみなす条件
    • タスクが実行時例外をスローする(Java MapReduce)
    • リターンコードに0以外を返す(Hadoop Streaming)
  • プロセス(JVM)がクラッシュする
    • タイムアウト。タスクプログラムが一定期間(デフォルト10分)以上、tasktrackerへ進行状況を通知しない場合

タスクの再実行

  • [記入者] terurou
  • [該当箇所] 176〜177ページ(6.2.1 タスクの障害)
  • タスクの実行が失敗した場合、それが失敗したtasktrackerとは別のtasktrackerで実行される。

tasktrackerの障害

  • [記入者] terurou
  • [該当箇所] 177ページ(6.2.2 tasktrackerの障害)
  • jobtrackerは障害の発生したtasktrackerノードを発見した場合、そのノードをtasktracker poolから切り離す
  • poolから切り離す際、そのノードで実行された「実行中ジョブのタスク」は、正常に完了していたとしても他のノードで再実行される(reduceタスクが障害ノードからデータを読み出せる保障がないため)
  • 障害ノードとみなす条件
    • tasktracker→jobtrackerへのハートビートが一定期間(デフォルト10分)送信されない

jobtrackerのブラックリスト

  • [記入者] terurou
  • [該当箇所] 177ページ(6.2.2 tasktrackerの障害)
  • ブラックリストに登録されると、そのノードにはタスクを割り当てられなくなる
  • ブラックリストに登録される条件
    • ハートビートを送信してこないtasktrackerノード
    • 他のtasktrackerノードと比較し顕著にタスクの成功率が低いtasktrackerノード
  • tasktrackerを再起動するとブラックリストから除外される

jobtrackerの障害

  • [記入者] terurou
  • [該当箇所] 177〜178ページ(6.2.3 jobtrackerの障害)
  • 現状のHadoopにjobtrackerでの障害には耐性がない。
  • ハードウェアの冗長化しか逃げ道がない。

ジョブスケジューラの種類

  • [記入者] terurou
  • [該当箇所] 178〜179ページ(6.3 ジョブのスケジューリング)
  • デフォルトではFIFOスケジューラ
    • 優先度の低いジョブがリソースを占有してしまう
  • フェアスケジューラ
    • 複数のクライアントからジョブが投入されると、各クライアントに公平にリソースを割り当てる
  • 他にもキャパシティスケジューラなどがあるらしい

プリエンプション?

  • [記入者] terurou
  • [該当箇所] 178ページ(6.3 ジョブのスケジューリング)
  • 優先度によってタスクの実行順序を動的に入れ替える機能
  • Wikipedia - プリエンプション

シャッフルとソート

  • [記入者] terurou
  • [該当箇所] 179〜182ページ(6.4 シャッフルとソート)
  • MapReduceの心臓部。継続的に改善が行われている。
  • map側
    • mapの出力は循環メモリバッファに書き出される
    • バッファサイズがしきい値(デフォルト100MB×0.8)をこえた場合、バッファの内容をスピルファイルとしてディスクに書き出す
    • mapタスク終了の前にスピルファイルを単一ファイルにマージする
    • combiner関数はスピルファイルが一定数(デフォルト3つ)以上あった場合に実行される
  • reduce側
    • コピーフェイズ:mapから並列で出力をコピーしてくる
    • ソートフェイズ(マージフェイズ):mapの出力をソート順序を保ちつつマージする
    • Reduceフェイズ:ソートフェイズの結果をreduce関数に渡す

MapReduceのチューニング

  • [記入者] terurou
  • [該当箇所] 182〜184ページ(6.4.3 設定のチューニング)
  • 可能な限りシャッフルにメモリを割り当てる(map関数reduce関数でなるべくメモリを使わないようなコードを書くべき)
  • mapで生成されるスピル数を小さくする
  • reduceの中間ファイルをメモリ上に持つ
  • バッファサイズ(デフォルト4KB)を増やす

投機的実行

  • [記入者] terurou
  • [該当箇所] 184〜186ページ(6.5.1 投機的実行)
  • 最適化の1つ、全体のスループットの向上を狙っている
  • 低速なタスクを検出した場合、同等のタスクをバックアップとして実行する
  • オリジナル・投機的タスクの先に終了した方の結果を採用し、終了しなかった方のタスクはkillされる
  • 投機的なタスクのスケジューリングが過度になるど、逆効果になる
  • [記入者] mzp
  • [該当箇所] 184〜186ページ(6.5.1 投機的実行)
  • なぜ再投入すると早くなるんでしょうか?
  • 各ノードの実行速度が違うから?
  • [記入者] terurou
  • 以下のようなことにより、ノードによってのタスクの実行速度が変わると考えられる
    • ハードウェアスペックの違い(異なるスペックのハードウェアが混在するクラスタ)
    • ハードウェア障害(ディスク・メモリ等)
    • ネットワークボトルネック(データとtasktrackerのネットワーク距離が遠い)

JVM再利用時の「状態の共有」?

  • [記入者] こうけつ
  • [該当箇所] 186下の方
  • 「同じジョブのタスク間で状態を共有したい場合」とあるが、当然ノードが異なればJVMは別になるはずなので、ノードをまたいでは状態を共有できないですよね?
  • ここで言いたいのは、インスタンスにコストの掛かるイミュータブルな定数フィールドを共有することで、高速化できると言うことなのかな。
  • [記入者] terurou
  • おそらく「同じジョブ」「同じノード」で動作するタスク間でデータ共有できるという意味じゃないかと思います。
  • 別ノードのJVMとはデータを共有するという話ではないものと私も解釈しました。。。

スキッピングモード

  • [記入者] terurou
  • [該当箇所] 187〜188ページ(6.5.3 不良レコードのスキップ)
  • mapper, reducerで不良レコードを見つけた場合はスキップするようなコードを書くべきだが、世の中うまくはいかない
  • スキッピングモードを有効にすると、2回タスクが失敗した場合、tasktrackerは例外の発生するレコードの位置を覚えて、そのレコードをスキップして処理を行う
  • この機能はデフォルトオフ

タスクの副作用ファイル

  • [記入者] terurou
  • [該当箇所] 189〜190ページ(6.5.4.2 タスクの副作用ファイル)
  • タスクから共有ファイルシステムへ直接出力することも可能
  • 通常のKey/Value以外の複雑な出力が欲しい場合に使う
  • タスク別に個別のディレクトリに出力、書き込みのバッティングを避ける

コメントをかく


「http://」を含む投稿は禁止されています。

利用規約をご確認のうえご記入下さい

どなたでも編集できます