「整列 (sorting) 」の続きです。今回は「連結リスト (linked list) 」と「文字列 (sting) 」のソートについて取り上げます。連結リストや文字列は今まで説明したアルゴリズム、たとえばクイックソートでもソートすることが可能です。ですが、連結リストをソートする場合は、クイックソートよりもマージソートの方が適しています。
文字列のソートアルゴリズムは、基数ソートの一種である MSD radix sort やクイックソートの変形である「マルチキークイックソート (multikey quicksort) 」などがあります。単純なクイックソートでは時間がかかる場合、たとえば Suffix Array というデータ構造を構築する場合は、これらのアルゴリズムが使われています。
まずは、連結リストのソートから説明しましょう。連結リストはセルを一方向につなげたデータ構造なので、配列のようにランダムアクセスすることはできません。このため、クイックソートのように配列を直接書き換えるアルゴリズムを、そのまま連結リストに適用することはできません。ただし、新しい連結リストを作ってもよいのであれば、連結リストでもクイックソートすることは可能です。
基本的な考え方は簡単です。クイックソートは枢軸を基準にして、要素をそれより大きいものと小さいものの 2 つに分割していくことでソートを行います。枢軸は要素の中から適当な値を選んでいいのですが、連結リストの場合は任意の箇所を簡単に選ぶことができません。この場合は、いちばん簡単に求めることができる先頭の要素を枢軸とします。
次に連結リストを 2 つに分けます。このとき、要素は新しい連結リストに格納します。そして、それらの連結リストを再帰呼び出しして同様にソートします。最後に、その結果を枢軸を挟んで結合します。これを図に表すと次のようになります。
5 3 7 6 9 8 1 2 4 5 を枢軸に分割 (3, 1, 2, 4) 5 (7, 6, 9, 8) 3を枢軸に分割 7を枢軸に分割 (1, 2) 3 (4) | 5 | (6) 7 (9, 8) ・・・分割を繰り返していく・・・ 図 : 連結リストのクイックソート
このように連結リストを分割していくと、最後は空リストになります。ここが再帰の停止条件になります。あとは分割した連結リストを結合すればいいわけです。
それではプログラムを作りましょう。最初に連結リストを定義します。
リスト : 連結リスト class LinkedList: class Cell: def __init__(self, x, y = None): self.data = x self.next = y def __init__(self): self.size = 0 self.top = None
クラス名は LinkedList とし、その中でセルを表すクラス Cell を定義します。Cell のインスタンス変数 data が要素を、next が次のセルへの参照を格納します。LinkedList のインスタンス変数 size が要素数を、top が最初のセルへの参照を格納します。このほかに、必要最低限なメソッドとして push, pop, __str__ を定義しています。詳細は プログラムリスト1 をお読みください。
次はクイックソートのプログラムを示します。
リスト : クイックソート def quick_sort(self): def append(x, y): if x: cp = x while cp.next: cp = cp.next cp.next = y return x return y def qsort(cp): if cp: pivot = cp.data low = None high = None cp = cp.next while cp: if cp.data < pivot: low = LinkedList.Cell(cp.data, low) else: high = LinkedList.Cell(cp.data, high) cp = cp.next return append(qsort(low), LinkedList.Cell(pivot, qsort(high))) return None # self.top = qsort(self.top)
内部関数 append は連結リスト x と y を結合します。この append は Lisp の関数 nconc と同じで、x の最後尾のセルを破壊的に修正して y を連結します。ご注意ください。
クイックソートは内部関数 qsort で行います。変数 pivot が枢軸を表します。while ループで連結リストをたどって要素を取り出します。そして、枢軸と比較して小さい場合は連結リスト low に、大きい場合は連結リスト high に格納します。これで枢軸を基準にデータを 2 つの連結リストに分けることができました。あとは、qsort を再帰呼び出しして、その結果を append で結合します。
それでは簡単な実行例を示します。
# 簡単なテスト if __name__ == '__main__': import random a = LinkedList() for x in xrange(10): a.push(random.randint(0,100)) print a a.quick_sort() print a
LList(53, 56, 41, 26, 89, 89, 100, 33, 45, 28) LList(26, 28, 33, 41, 45, 53, 56, 89, 89, 100)
整列 [1] で説明したように、クイックソートは高速なアルゴリズムですが、枢軸の選び方によっては遅いソートと同じになってしまいます。今回のように、リストの先頭要素を枢軸として選ぶ場合、リストの要素が昇順または降順に並んでいると最悪の結果になります。このため、クイックソートをプログラムする場合、枢軸の選び方を工夫するのが一般的です。
ただし、要素を数個選んで中間の値を枢軸とする方法は、連結リストに不向きであることに注意してください。たとえば、要素が 1000 個ある場合、配列であれば 0, 500, 999 番目の要素を取り出すのは簡単ですが、連結リストでは要素数が多くなるほど、後ろの要素を取り出すのに時間がかかるようになります。先頭から 3 つの要素を取り出して枢軸を選んだとしても、降順または昇順に並んだデータには効果が無いのは明らかです。この点でも、クイックソートは配列に向いているソートアルゴリズムといえます。
次は、クイックソートと同様に高速なアルゴリズムであるマージソートを取り上げます。マージソートの場合、連結リストを直接書き換えながらソートすることができます。さっそくプログラムを示しましょう。次のリストを見てください。
リスト : 連結リストのマージソート def merge_sort(self): def merge_list(x, y): head = LinkedList.Cell(None) cp = head while x and y: if x.data <= y.data: cp.next = x cp = x x = x.next else: cp.next = y cp = y y = y.next if x: cp.next = x else: cp.next = y return head.next def msort(x, n): if n == 1: x.next = None return x else: m = n / 2 y = x for _ in xrange(m): y = y.next return merge_list(msort(x, m), msort(y, n - m)) if self.size > 1: self.top = msort(self.top, self.size)
内部関数 merge_list が連結リストをマージ (併合) する関数で、ソート済みの連結リスト x と y を一つの連結リストにまとめます。この関数は連結リストを破壊的に修正してマージします。最初に連結リストのヘッダ head を用意します。このあとに、連結リストをつないでいきます。変数 cp は最後尾のセルを示します。
マージ処理はとても簡単です。while ループで x と y にデータがある間、2 つのデータ x.data, y.data を比較し、小さいほうのセルを cp の後ろ (cp.next) につなげます。そして、cp の値をつなげたセルに更新して、次のセルへ進めます。while ループが終了して、x に連結リストが残っていれば、それを cp の後ろにつなげます。x が空リストであれば y に残っている連結リストをつなげます。最後に、return でマージしたリスト head.next を返します。
実際のマージソートは内部関数 msort で行います。x がソートする連結リスト、 n が連結リストの長さを表します。msort は連結リストを分割する処理で、新しい連結リストを作らないことに注意してください。次の図を見てください。
引数 x | |←──── 長さ n ───→| (1, 2, 3, 4, 5, 6, 7, 8) |← n/2 →| |← m ─→| | | m = (n - n/2) 引数 x 引数 y 図 : 連結リストの分割
msort はソートする連結リストの範囲を開始位置と長さで表しています。上図の連結リストを二分割する場合、前半部分は x と n/2 で表し、後半部分を y と (n - n/2) で表します。y は連結リストを n / 2 回たどれば求めることができます。
n が 1 になったら x.next を None に書き換えます。これが再帰の停止条件で、要素数が一つの連結リスト、つまりソート済みの連結リストを返すことになります。n が 1 よりも大きい場合は、連結リストを二分割して msort を再帰呼び出しし、その結果を merge_list でマージすればいいわけです。
それでは簡単な実行例を示します。
# 簡単なテスト if __name__ == '__main__': import random b = LinkedList() for x in xrange(10): b.push(random.randint(0,100)) print b b.merge_sort() print b
LList(19, 51, 26, 1, 60, 83, 54, 57, 47, 75) LList(1, 19, 26, 47, 51, 54, 57, 60, 75, 83)
次はクイックソートとマージソートの実行速度を比較してみましょう。乱数データを 1000, 2000, 4000, 8000 個を作ってソートします。結果は次のようになりました。
表 : 実行結果 (単位 : 秒) 個数 : quick merge -------------------- 1000 : 0.087 0.066 2000 : 0.180 0.145 4000 : 0.445 0.317 8000 : 1.053 0.689 実行環境 : Windows XP, celeron 1.40 GHz, Python 2.4.2
クイックソートよりもマージソートのほうが速くなりました。連結リストのソートはマージソートのほうが適していることがわかります。Python の場合、連結リストのソートはマージソートでも時間がかかりますね。興味のある方は他のプログラミング言語、たとえばC言語や Lisp (Scheme) などで試してみてください。
# coding: utf-8 # # listsort.py : 連結リストのソート # # Copyright (C) 2007 Makoto Hiroi # class LinkedList: class Cell: def __init__(self, x, y = None): self.data = x self.next = y def __init__(self): self.size = 0 self.top = None def pop(self): value = None if self.top: value = self.top.data self.top = self.top.next self.size -= 1 return value def push(self, x): self.top = LinkedList.Cell(x, self.top) self.size += 1 def __str__(self): cp = self.top if not cp: return 'LList()' s = 'LList(' while cp.next: s += '%s, ' % cp.data cp = cp.next s += '%s)' % cp.data return s # クイックソート def quick_sort(self): def append(x, y): if x: cp = x while cp.next: cp = cp.next cp.next = y return x return y def qsort(cp): if cp: pivot = cp.data low = None high = None cp = cp.next while cp: if cp.data < pivot: low = LinkedList.Cell(cp.data, low) else: high = LinkedList.Cell(cp.data, high) cp = cp.next return append(qsort(low), LinkedList.Cell(pivot, qsort(high))) return None # self.top = qsort(self.top) # マージソート def merge_sort(self): def merge_list(x, y): head = LinkedList.Cell(None) cp = head while x and y: if x.data <= y.data: cp.next = x cp = x x = x.next else: cp.next = y cp = y y = y.next if x: cp.next = x else: cp.next = y return head.next def msort(x, n): if n == 1: x.next = None return x else: m = n / 2 y = x for _ in xrange(m): y = y.next return merge_list(msort(x, m), msort(y, n - m)) # if self.size > 1: self.top = msort(self.top, self.size) # 簡単なテスト if __name__ == '__main__': import random a = LinkedList() for x in xrange(10): a.push(random.randint(0,100)) print a a.quick_sort() print a b = LinkedList() for x in xrange(10): b.push(random.randint(0,100)) print b b.merge_sort() print b
クイックソートは汎用的なソートアルゴリズムですが、ソートするデータによっては他のアルゴリズムの方が高速にソートできる場合があります。1997 年に Jon Bentley と Robert Sedgewick が発表した「マルチキークイックソート (Multikey Quicksort) 」は文字列のソートに適した高速なアルゴリズムです。このほかに前回説明した基数交換法 (MSD radix sort) も文字列のソートに適したアルゴリズムです。今回はマルチキークイックソートを簡単に説明して、実際にテキストファイルを行単位でソートするプログラムを作ってみましょう。
なお、マルチキークイックソートの詳しい説明は Robert Sedgewick の Sorting and Searching Strings で公開されている Bentley と Sedgewick の論文 "Fast Algorithms for Sorting and Searching Strings" をお読みください。
ところで、Python などのスクリプト言語で文字列を単純にソートする場合、組み込みのソート関数を使ったほうが高速です。本ページはアルゴリズムの学習が目的であり、Python でマルチキークイックソートや MSD radix sort をプログラムしたからといって、組み込み関数 sort よりも速くなるわけではありません。蛇足だとは思いますが、念のためご注意申しあげます。
マルチキークイックソートで文字列をソートする場合、普通のクイックソートと大きく異なる点が 2 つあります。一つは文字単位で比較を行うことです。文字列をソートする場合、一般的なソートは文字列単位で比較を行います。ところが、マルチキークイックソートは最初に 1 文字目を比較してソートを行い、ソートが完了しない場合(同じ値が複数ある場合)は、さらに 2 文字目を比較してソートを行う、というように先頭から順番に文字を比較してソートを行います。
もう一つは区間の分け方です。普通のクイックソートは枢軸を基準にして、小さい要素と大きい要素の 2 つの区間に分割していくことでソートを行います。マルチキークイックソートは枢軸を基準にするところは同じですが、区間を二分割するのではなく、小さい要素、枢軸と等しい要素、大きい要素の三分割にするところが特徴です。このため、マルチキークイックソートは Ternary Quicksort と呼ばれる場合もあります。
たとえば、n 番目の文字を比較して区間を三分割したとしましょう。小さい要素と大きい要素の区間は n 番目の文字でソートを続行すればいいですね。これは普通のクイックソートと同じです。枢軸と等しい要素が複数ある場合、n 番目の文字ではソートが完了しないので、次は n + 1 番目の文字を比較してソートを行います。このように枢軸と等しい要素を集めることで、その区間は次の文字へ進めることができるわけです。
このアルゴリズムを疑似コードでプログラムすると、次のようになります。
リスト:マルチキークイックソート multikey_quicksort(buff, low, high, n): if 区間のデータ数が LIMIT 以下: 単純なソートアルゴリズムに切り替えてソート else: n 番目の文字で枢軸を選択して区間 (low, high) を 3 分割する # < : (low,- m1-1), = : (m1, m2-1), > : (m2, high) multikey_quicksort(buff, low, m1 - 1, n) if n 番目の文字 != 文字列の終端: multikey_quicksort(buff, m1, m2 - 1, n + 1) multikey-quick(buff, m2, high, n)
マルチキークイックソートは再帰呼び出しを使うと簡単にプログラムできます。引数 buff は文字列を格納する配列です。区間を引数 low と high で表し、比較する文字の位置を n で表します。区間の要素数が一定の個数 (LIMIT) 以下になったら、単純なソートアルゴリズムに切り替えます。この方が少しだけ速くなります。
n 番目の文字で区間を三分割したら、multikey_quicksort を再帰呼び出しします。このとき、枢軸より小さい区間 (low, m1-1) と大きい区間 (m2, high) は n 番目の文字でソートを続行します。等しい区間 (m1, m2-1) は n + 1 番目の文字へ進めてソートを行います。もしも、等しい区間の文字 (枢軸) が文字列の終端 (たとえば改行文字 '\n') であれば、文字列を最後まで比較したので再帰呼び出しは行いません。つまり、同じ文字列が複数個あるということです。
マルチキークイックソートが高速なのは、このアルゴリズムに秘密があります。最初のポイントは、文字列ではなく文字を比較することです。当然ですが文字列よりも文字 (整数) の比較の方が高速ですね。したがって、文字列を比較して区間を分割するよりも高速に分割することができます。
第 2 のポイントが、区間を三分割して枢軸と等しい値を集めることです。これにより、基数ソート (基数交換法) と同等の効果を生み出しています。つまり、区間を分割するたびに同じ文字をひとつの区間に集めているのです。そして、同じ文字の区間であれば、次の文字でソートを行います。この動作は基数交換法と同じです。これらの効果により、マルチキークイックソートは文字列を高速にソートすることができるのです。
ところで、基数交換法 (MSD radix sort) も文字列に適した高速なソートアルゴリズムですが、作業用のメモリが必要になるため、クイックソートよりも使いにくいのが欠点です。プログラムは簡単に作成できるので、あとで試してみましょう。
マルチキークイックソートのプログラムは、ズバリ 「区間を三分割する処理」 がポイントになります。この処理のよしあしによって実行時間は大きく左右されますが、心配する必要はありせん。Bentley と Sedgewick が論文で効率の良い方法を示しています。次の図を見てください。
L R 左側から枢軸 (2) 以上の値を探し、 3 2 1 3 2 2 1 3 2 1 右側から枢軸以下の値を探す M N L R 交換して L, R を進める 1 2 1 3 2 2 1 3 2 3 枢軸と等しい値は端へ移動する M N L R 2 1 1 3 2 2 1 3 3 2 L と M, R と N を交換して進める M N L R 2 1 1 3 2 2 1 3 3 2 L と R 交換して進める M N L R 2 1 1 1 2 2 3 3 3 2 L と M, R と N を交換して進める M N R L L と R が交差したら 2 2 1 1 1 3 3 3 2 2 枢軸と等しい値を中央に集める M N 1 1 1 2 2 2 2 3 3 3 分割終了 図 : 区間の三分割
基本的にはクイックソートと同様に、左端から枢軸以上の要素を探し、右端から枢軸以下の要素を探して、それを交換することで区間を分割します。このとき、枢軸と等しいデータは一時的に両端へ集めるところがポイントです。つまり、区間を等しい要素、小さい要素、大きい要素、等しい要素の 4 つに分割するのです。分割が終了したら、等しいデータを中央に集めます。これはデータを移動するだけなので簡単です。これで区間を三分割することができます。
それではプログラムを作りましょう。マルチキークイックソートのプログラムは次のようになります。
リスト : マルチキークイックソート # マルチキークイックソート def mquick_sort(buff, low, high, n = 0): if high - low <= LIMIT: insert_sort1(buff, low, high, n) else: pivot = select_pivot_m(buff, low, high, n) i = m1 = low j = m2 = high # 区間 (low, high) を 4 分割する while True: while i <= j: k = buff[i][n] if k > pivot: break if k == pivot: swap(buff, i, m1) m1 += 1 i += 1 while i <= j: k = buff[j][n] if k < pivot: break if k == pivot: swap(buff, j, m2) m2 -= 1 j -= 1 if i > j: break swap(buff, i, j) i += 1 j -= 1 # 枢軸と等しいデータ (左端) を中央に集める for k in xrange(min(m1 - low, i - m1)): swap(buff, low + k, j - k) m1 = low + (i - m1) # 枢軸と等しいデータ (右端) を中央に集める for k in xrange(min(high - m2, m2 - j)): swap(buff, i + k, high - k) m2 = high - (m2 - j) + 1 # 枢軸より小さい区間をソート if low < m1: mquick_sort(buff, low, m1 - 1, n) # 枢軸と等しい区間をソート if m1 < m2 and buff[m1][n] != '\n': mquick_sort(buff, m1, m2 - 1, n + 1) # 枢軸より大きい区間をソート if m2 <= high: mquick_sort(buff, m2, high, n)
関数 mquick_sort の引数 buff がソートする配列で、low と high が区間を表し、n が比較する文字の位置を表します。区間が LIMIT (10) 以下になったならば挿入ソート (insert_sort1) に切り替えます。枢軸は関数 select_pivot_m で選び、変数 pivot にセットします。枢軸は low, (low + high) / 2, high の 3 か所の文字を比較して中央の値を返します。そして、最初の while ループで区間を 4 分割します。
次の while ループで、左端から枢軸よりも大きな要素を探します。枢軸と等しい要素を見つけたら、それを左端へ移動して探索を続行します。その次の while ループで、右端から枢軸よりも大きな要素を探します。枢軸と等しい要素を端へ移動する処理があるため、枢軸を番兵として使うことができません。このため、while の条件部で i <= j をチェックしています。i と j が交差したら、分割処理を終了します。
この段階で区間は 4 分割されています。次に、両端に集めた枢軸と等しいデータを中央へ移動します。次の図を見てください。
(1) m1 j i m1 j i 2 2 1 1 1 1 1 3 ..... => 1 1 1 1 1 2 2 3 ..... ---- ---- 交換する (2) m1 j i m1 j i 2 2 2 2 1 1 1 3 ..... => 1 1 1 2 2 2 2 3 ..... ------- ------- 交換する 図 : 枢軸と等しい値を中央に集める
狭い区間のデータ数を N とすると、左端 (low) と右端 (j) からデータを N 個交換するだけです。プログラムでは、関数 min で狭い区間のデータ数を求め、関数 swap でデータを交換しています。m1 は小さなデータの個数に更新します。同じように、右端に集めたデータも中央へ移動させます。これで区間を 3 分割することができます。
あとは mquick_sort を再帰呼び出しするだけです。このとき、枢軸と等しいデータの区間は、文字列の終端 ('\n') をチェックすることをお忘れなく。また、この区間の再帰呼び出しは、とても簡単に繰り返しに変換することができます。興味のある方は試してみてください。
それでは、実行結果を示します。テストデータは文字 (アルファベットと数字) をランダムで選んで作成しました。1 行 8 文字で、1000, 2000, 4000, 8000, 16000, 32000 行の 6 種類のデータを用意しました。行の最後に改行文字 ('\n') が付いていないと、今回作成したマルチキークイックソートは正常に動作しません。ご注意ください。
実行時間を比較するため、3 種類のプログラムを用意しました。
ここで、一つだけ注意点があります。Python の場合、文字列データは数値データと同じ比較演算子で比較することができます。今まで作成したソートプログラムは、そのまま文字列のソートに適用することができるのですが、そうすると文字列の比較が組み込み演算子で高速に行われるため、平等なテストにはなりません。そこで、文字列の比較には次に示す関数を使うことにします。
リスト : 文字列の比較 def compare(s1, s2, n = 0): while True: if s1[n] < s2[n]: return -1 elif s1[n] > s2[n]: return 1 elif s1[n] == '\n': break n += 1 return 0
関数 compare は文字列を 1 文字ずつ比較して、その結果を -1, 0, 1 で返します。挿入ソートとクイックソートは compare でデータを比較するように修正しています。プログラムの詳細は プログラムリスト2 をお読みください。
ソート処理の実行時間を計測したところ、結果は次のようになりました。ファイルの入出力処理は計測時間に含まれていません。ご注意くださいませ。
表 : 実行結果 (単位 : 秒) 個数 : quick radix mquick ----------------------------- 1000 : 0.024 0.022 0.013 2000 : 0.049 0.026 0.027 4000 : 0.112 0.037 0.058 8000 : 0.243 0.066 0.123 16000 : 0.553 0.126 0.261 32000 : 1.247 0.395 0.574 実行環境 : Windows XP, celeron 1.40 GHz, Python 2.4.2
乱数データの場合、マルチキークイックソートはクイックソートよりも高速ですが、MSD radix sort にはかないませんでした。もっとも、今回のデータが 1 行 8 桁の文字列なので、MSD radix sort には有利なテストだったと思います。ほかのデータでも試してみないと、正しい評価はできないでしょう。
ところで、今回の MSD radix sort は再帰呼び出しを使っているため、文字列が長くなるとスタックオーバーフローの危険性があります。また、作業用のメモリも必要となるため、実際にはちょっと使いにくいソートでしょう。その点では、マルチキークイックソートの方が使いやすくて高速なソートアルゴリズムだと思います。
なお、これらの結果は M.Hiroi のコーディング、実行したマシン、プログラミング言語などの環境に大きく依存しています。また、これらの環境だけではなく、データの種類によっても実行時間はかなり左右されます。興味のある方は、いろいろなデータをご自分の環境で試してみてください。ご参考までにC言語のプログラムを Appendix に示します。よろしければお読みくださいませ。
# coding: utf-8 # # stringsort.py : 文字列のソート # # Copyright (C) 2007 Makoto Hiroi # # 文字列の比較 def compare(s1, s2, n = 0): while True: if s1[n] < s2[n]: return -1 elif s1[n] > s2[n]: return 1 elif s1[n] == '\n': break n += 1 return 0 # 挿入ソート (1) def insert_sort(buff): k = len(buff) for i in xrange(1, k): temp = buff[i] j = i - 1 while j >= 0 and compare(temp, buff[j]) < 0: buff[j + 1] = buff[j] j -= 1 buff[j + 1] = temp # 挿入ソート (2) def insert_sort1(buff, low, high, n = 0): for i in xrange(low + 1, high + 1): temp = buff[i] j = i - 1 while j >= low and compare(temp, buff[j], n) < 0: buff[j + 1] = buff[j] j -= 1 buff[j + 1] = temp # 枢軸の選択 (quick sort 用) def select_pivot(buff, low, high): a = buff[low] b = buff[(low + high) / 2] c = buff[high] if compare(a, b) > 0: tmp = a a = b b = tmp if compare(b, c) > 0: b = c if a > b: b = a return b # クイックソート LIMIT = 10 END = ord('\n') def quick_sort(buff, low, high): stack = [] while True: if high - low <= LIMIT: if len(stack) == 0: break low, high = stack.pop() pivot = select_pivot(buff, low, high) i = low j = high while True: while compare(pivot, buff[i]) > 0: i += 1 while compare(pivot, buff[j]) < 0: j -= 1 if i >= j: break temp = buff[i] buff[i] = buff[j] buff[j] = temp i += 1 j -= 1 # if i - low > high - j: if i - low > LIMIT: stack.append((low, i - 1)) low = j + 1 else: if high - j > LIMIT: stack.append((j + 1, high)) high = i - 1 insert_sort(buff) # 基数ソート (MSD radix sort) def radix_sort(buff, low, high, n = 0): if high - low <= LIMIT: insert_sort1(buff, low, high, n) else: count = [0] * 257 # for i in xrange(low, high + 1): count[ord(buff[i][n])] += 1 for i in xrange(1, 256): count[i] += count[i - 1] for i in xrange(high, low - 1, -1): c = ord(buff[i][n]) count[c] -= 1 work[count[c] + low] = buff[i] for i in xrange(low, high + 1): buff[i] = work[i] # count[256] = high - low + 1 for i in xrange(0, 256): l = low + count[i] h = low + count[i + 1] - 1 if i != END and l < h: radix_sort(buff, l, h, n + 1) # 枢軸の選択 (multikey quick sort 用) def select_pivot_m(buff, low, high, n): a = buff[low][n] b = buff[(low + high) / 2][n] c = buff[high][n] if a > b: tmp = a a = b b = tmp if b > c: b = c if a > b: b = a return b # データの交換 def swap(buff, x, y): temp = buff[x] buff[x] = buff[y] buff[y] = temp # マルチキークイックソート def mquick_sort(buff, low, high, n = 0): if high - low <= LIMIT: insert_sort1(buff, low, high, n) else: pivot = select_pivot_m(buff, low, high, n) i = m1 = low j = m2 = high while True: while i <= j: k = buff[i][n] if k > pivot: break if k == pivot: swap(buff, i, m1) m1 += 1 i += 1 while i <= j: k = buff[j][n] if k < pivot: break if k == pivot: swap(buff, j, m2) m2 -= 1 j -= 1 if i > j: break swap(buff, i, j) i += 1 j -= 1 # for k in xrange(min(m1 - low, i - m1)): swap(buff, low + k, j - k) m1 = low + (i - m1) for k in xrange(min(high - m2, m2 - j)): swap(buff, i + k, high - k) m2 = high - (m2 - j) + 1 # if low < m1: mquick_sort(buff, low, m1 - 1, n) if m1 < m2 and buff[m1][n] != '\n': mquick_sort(buff, m1, m2 - 1, n + 1) if m2 <= high: mquick_sort(buff, m2, high, n) # TEST for name in ['s1.dat', 's2.dat', 's4.dat', 's8.dat', 's16.dat', 's32.dat']: f = open(name, 'r') buff = f.readlines() print name, for func in [quick_sort, radix_sort, mquick_sort]: a = buff[:] b = buff[:] work = [0] * len(buff) # sort s = time.clock() func(a, 0, len(buff) - 1) e = time.clock() b.sort() if a != b: print 'error' print "%.3f" % (e - s), print
ご参考までにC言語のプログラムと実行時間を示します。データは 1 行 8 文字で、100000, 200000, 400000, 800000 行の 4 種類のデータを用意しました。
リスト : C言語のプログラム /* * sort.c : 文字列のソート * * Copyright (C) 2007 Makoto Hiroi */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <ctype.h> #include <time.h> #define MAX_SIZE 10000000 #define MAX_LINE 1000000 #define LIMIT 10 #define CODE_SIZE 256 #define TRUE 1 #define FALSE 0 /* マクロ関数 */ #define MIN(a, b) ((a) < (b) ? (a) : (b)) /* 交換マクロ */ #define SWAP(buff, i, j) {\ char *_tmp = buff[i];\ buff[i] = buff[j]; \ buff[j] = _tmp; \ } /* グローバル変数 */ char *buffer; char **line; char **work; /* 単純挿入ソート */ void insert_sort(char **buff, int low, int high, int n) { int i, j; for(i = low + 1; i <= high ; i++){ char *tmp = buff[i]; for( j = i - 1; j >= low && strcmp(tmp + n, buff[j] + n) < 0; j-- ){ buff[j + 1] = buff[j]; } buff[j + 1] = tmp; } } /***** Multikey QuickSort *****/ /* 枢軸の選択 */ int select_pivot_m(char **buff, int low, int high, int n) { int a = *(buff[low] + n); int b = *(buff[(low + high) / 2] + n); int c = *(buff[high] + n); if(a > b){ int temp = a; a = b; b = temp; } if(b > c){ b = c; if(a > b) b = a; } return b; } void mquick_sort(char **buff, int low, int high, int n) { if(high - low <= LIMIT){ /* 単純挿入ソート(ソートを完了する) */ insert_sort(buff, low, high, n); } else { int pivot, i, j, k, l, m1, m2; pivot = select_pivot_m(buff, low, high, n); /* 枢軸の選択 */ /* 最初は 4 分割 */ i = m1 = low; j = m2 = high; for(;;){ while(i <= j){ k = *(buff[i] + n) - pivot; if(k > 0) break; if(!k){ /* 枢軸と同じ値を端へ移動 */ SWAP(buff, i, m1); m1++; } i++; } while(i <= j){ k = *(buff[j] + n) - pivot; if(k < 0) break; if(!k){ /* 枢軸と同じ値を端へ移動 */ SWAP(buff, j, m2); m2--; } j--; } if(i > j) break; SWAP(buff, i, j); i++; j--; } /* 左端のデータを中央に移動 */ k = MIN(m1 - low, i - m1); for(l = 0; l < k; l++) SWAP(buff, low + l, j - l); m1 = low + (i - m1); /* 右端のデータを中央に移動 */ k = MIN(high - m2, m2 - j); for(l = 0; l < k; l++) SWAP(buff, i + l, high - l); m2 = high - (m2 - j) + 1; /* 小さい区間をソート */ if(low < m1) mquick_sort(buff, low, m1 - 1, n); /* 等しい区間をソート ('\0' ならばソート終了) */ if(m1 < m2 && *(buff[m1] + n) != '\0' ){ mquick_sort(buff, m1, m2 - 1, n + 1); } /* 大きい区間をソート */ if(m2 <= high) mquick_sort(buff, m2, high, n); } } /***** 基数交換法 (MSD radix sort) *****/ void radix_sort(char **buff, int low, int high, int n) { int i, count[CODE_SIZE + 1]; /* 分布数えソート */ for(i = 0; i < CODE_SIZE; i++) count[i] = 0; for(i = low; i <= high; i++) count[*(buff[i] + n)]++; for(i = 1; i < CODE_SIZE; i++) count[i] += count[i - 1]; for(i = high; i >= low; i--){ int c = *(buff[i] + n); work[--count[c] + low] = buff[i]; } /* 元に戻す */ for(i = low; i <= high; i++) buff[i] = work[i]; /* 文字ごとにソート 0 ('\0') は文字列の終端だからソートの必要無し */ count[CODE_SIZE] = high - low + 1; for(i = 1; i < CODE_SIZE; i++){ int l = low + count[i]; int h = low + count[i + 1] - 1; if(l < h){ if(h - l <= LIMIT){ insert_sort(buff, l, h, n + 1); } else { radix_sort(buff, l, h, n + 1); } } } } /***** クイックソート *****/ #define STACK_SIZE 32 /* 枢軸の選択 */ char *select_pivot(char **buff, int low, int high) { char *a = buff[low]; char *b = buff[(low + high) / 2]; char *c = buff[high]; if(strcmp(a, b) > 0){ char *temp = a; a = b; b = temp; } if(strcmp(b, c) > 0){ b = c; if(strcmp(a, b) > 0) b = a; } return b; } void quick_sort(char **buff, int size) { int low_stack[STACK_SIZE]; int high_stack[STACK_SIZE]; int low = 0; int high = size - 1; int p = 0; while(TRUE){ if(high - low <= LIMIT){ if(!p) break; low = low_stack[--p]; high = high_stack[p]; } else { char *pivot = select_pivot(buff, low, high); int i = low, j = high; while(TRUE){ char *temp; while(strcmp(pivot, buff[i]) > 0) i++; while(strcmp(pivot, buff[j]) < 0) j--; if(i >= j) break; temp = buff[i]; buff[i] = buff[j]; buff[j] = temp; i++; j--; } if(i - low > high - j){ low_stack[p] = low; high_stack[p++] = i - 1; low = j + 1; } else { low_stack[p] = j + 1; high_stack[p++] = high; high = i - 1; } } } insert_sort(buff, 0, size - 1, 0); } /* ファイルを読み込む */ int read_file(void) { char *p = buffer; int i = 0, size = fread(buffer, 1, MAX_SIZE, stdin); buffer[size] = '\n'; do { if(i >= MAX_LINE){ fprintf(stderr, "行数が多すぎます\n"); exit(1); } line[i++] = p; p = strchr(p, '\n'); *p++ = '\0'; } while(p < buffer + size); return i; } /* ソート結果を出力 */ void write_file(int size) { int i; for(i = 0; i < size; i++) puts(line[i]); } #define MQ 1 #define QS 2 #define RS 3 /* 引数解析 */ int getargs(int argc, char **argv) { int i, flag = FALSE; for(i = 1; i < argc && argv[i][0] == '-'; i++){ int opt = toupper(argv[i][1]); switch(opt){ case 'M': flag = MQ; break; /* multikey quick sort */ case 'Q': flag = QS; break; /* quick sort */ case 'R': flag = RS; break; /* radix sort */ default : fprintf(stderr, "不正なオプションです\n"); exit(1); } } if(!flag){ fprintf(stderr, "ソート方法 (-m or -q or -r) を指定してください\n"); exit(1); } return flag; } int main(int argc, char **argv) { int flag, size, e, s; buffer = malloc(sizeof(char) * MAX_SIZE + 1); line = malloc(sizeof(char *) * MAX_LINE); work = malloc(sizeof(char *) * MAX_LINE); if(buffer == NULL || line == NULL || work == NULL){ fprintf(stderr, "Out of Memory\n"); exit(1); } flag = getargs(argc, argv); size = read_file(); s = clock(); switch(flag){ case MQ: fprintf(stderr, "Multikey Quicksort "); mquick_sort(line, 0, size - 1, 0); break; case QS: fprintf(stderr, "Quicksort "); quick_sort(line, size); break; case RS: fprintf(stderr, "MSD Radix sort "); radix_sort(line, 0, size - 1, 0); break; } e = clock(); write_file(size); fprintf(stderr, "size = %d, time = %d\n", size, e - s); free(buffer); free(line); free(work); return 0; }
表 : 実行結果 (単位 : 秒) 個数 : quick mquick radix ------------------------------ 100000 : 0.047 0.032 0.031 200000 : 0.156 0.078 0.063 400000 : 0.407 0.204 0.141 800000 : 0.985 0.469 0.396 実行環境 : Windows XP, celeron 1.40 GHz, Borland C++ 5.5.1 for Win32