表記の件について、JavaのListから作るStreamは操作中に変更をするとその後の挙動についてなんら結果を保証しないので、ListからStreamを作った場合は、変更をしないか、あるいは不可変なListに変換してから作るのが良いです。
と、書いておいてからなのですが、こんなツイート見かけました。
これは思わぬ結果
ステートフルな中間操作のsorted()かますと
最終的にListは空になるけどforEachで全て出力される。
なんでだろう?
これで今日も眠れないw pic.twitter.com/864kLHyK8K
— Yucchi (@Yucchi_jp) 2015, 5月 8
peekでListの要素を削除するというやってはいけないパターンです。
@Test public void intList() { List<Integer> list = IntStream.range(0, 10) .boxed().collect(toList()); list.stream() .sorted() .peek(list::remove) .forEach(System.out::println); }
これを実行するとどうなるでしょうか?
ConcurrentModificationException- 0〜9が表示される
- 0〜10が表示される
- 何も表示されない
ConcurrentModificationException
さて、上記のコードを以下のように変更するとConcurrentModificationExceptionが発生します。
@Test public void sample() { List<Integer> list = IntStream.range(0, 9) .boxed() .collect(toList()); list.stream() .limit(7) .peek(list::remove) .forEach(System.out::println); }
実行結果
0
java.util.ConcurrentModificationException
at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1353)
at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:529)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:516)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
この差は何によるのでしょうか?
Sink
ポイントはSinkというConsumerを拡張したインターフェースの実装にあります。
Streamは各操作ごとにオブジェクトが作成されていて、それがリンクドリスト状に連なっていきます。それをストリーム・パイプラインと呼びます。そして、終端操作(forEachやcollectなど)が呼び出された時に、ストリーム・パイプラインの中でデータやデータ量などの情報を運ぶのがSinkです。Sinkは状態を二つ持っています。実行可能状態と初期状態の二つです。初期状態→begin(int)(操作開始)→実行可能→accept(T)(操作)→end()(操作終了)→初期状態の順で状態が変わります。Sink自体はインターフェースなのでそのように呼び出すという契約となっています。
ここで各操作のクラスが実装しなければならないメソッドに次のメソッドがあります。
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink);
このメソッドの引数のSink<R>は次の操作、戻り値のSink<P_OUT>は実装するメソッドが提供する操作です。
普通のSinkの実装
ではlimitメソッドでは次のような実装になっています。
ReferencePipeline リンク
@Override public final Stream<P_OUT> limit(long maxSize) { if (maxSize < 0) throw new IllegalArgumentException(Long.toString(maxSize)); return SliceOps.makeRef(this, 0, maxSize); }
SliceOps.javaというのがlimit操作を提供するクラスです。
その中で、opWrapSinkメソッドをこのように実装しています。
@Override Sink<T> opWrapSink(int flags, Sink<T> sink) { return new Sink.ChainedReference<T, T>(sink) { long n = skip; long m = limit >= 0 ? limit : Long.MAX_VALUE; @Override public void begin(long size) { downstream.begin(calcSize(size, skip, m)); } @Override public void accept(T t) { if (n == 0) { if (m > 0) { m--; downstream.accept(t); } } else { n--; } } @Override public boolean cancellationRequested() { return m == 0 || downstream.cancellationRequested(); } }; }
ここで実装されるSinkのacceptで次の操作(downstream)の操作が呼び出されています。ほぼほとんどのSinkの実装はこのようになっています。
sortedでのSinkの実装
一方、sortedの実装は次のようになっています。
@Override public final Stream<P_OUT> sorted() { return SortedOps.makeRef(this); }
SortedOps#makeRefは次のようになっています。
static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) { return new OfRef<>(upstream); }
SortedOps.OfRefというのがsort操作を提供するクラスです。
その中で、opWrapSinkメソッドはこのように実装されています。
@Override public Sink<T> opWrapSink(int flags, Sink<T> sink) { Objects.requireNonNull(sink); // If the input is already naturally sorted and this operation // also naturally sorted then this is a no-op if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) return sink; else if (StreamOpFlag.SIZED.isKnown(flags)) return new SizedRefSortingSink<>(sink, comparator); else return new RefSortingSink<>(sink, comparator); }
ここで、flagsの値にはCollection#spliterator()リンク -> Spliterators#spliterator(Collection, int)リンク -> IteratorSpliterator(Collection, int)リンクと辿って行くと、List#streamから生成されるStreamの特性にSpliterator.SIZEDとSpliterator.SUBSIZEDが与えられていることがわかります。
また、コードのコメントを信じると(あかん…)、まだソートされていないので、StreamOpFlag.SORTED.isKnown(flags)はfalseになると考えられるので(もっとソース嫁)、SizedRefSortingSinkが実装クラスであることがわかります。
そして、SizedRefSortingSinkクラスは次のように実装されています。
private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> { private T[] array; private int offset; SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { super(sink, comparator); } @Override @SuppressWarnings("unchecked") public void begin(long size) { if (size >= Nodes.MAX_ARRAY_SIZE) throw new IllegalArgumentException(Nodes.BAD_SIZE); array = (T[]) new Object[(int) size]; } @Override public void end() { Arrays.sort(array, 0, offset, comparator); downstream.begin(offset); if (!cancellationWasRequested) { for (int i = 0; i < offset; i++) downstream.accept(array[i]); } else { for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) downstream.accept(array[i]); } downstream.end(); array = null; } @Override public void accept(T t) { array[offset++] = t; } }
ここで注目すべきはacceptメソッドであって、このメソッドではdownstreamの操作は行われません。downstreamの操作はendメソッドで一気に行われます。ここで先ほど書いたSinkの状態を思い出してください。このSizedRefSortingSinkの操作終了(end())が呼び出されて初めて次の操作が開始するのです。したがって、sorted以降のストリーム・パイプラインで記述された操作は、sortedが終了してから初めて行われるのです。そして、mapにおけるSinkの実装で記述したようにacceptメソッドで次の操作(downstream)のacceptが呼び出されるようになっていますので、sorted以降のストリーム・パイプラインの一連の操作はarrayの順番で呼び出されるようになります。すると、最初に引用したツイートのような事象も納得が行くようになります。
Streamの実行
Streamの操作は次のように実行されます(詳しくはForEachOps.ForEachOpのコードや、ReduceOpsのコードや、ArrayList.ArrayListSpliteratorのコードやHashMap.KeySpliteratorのコードなどを参照)。
- 終端操作に与えられた操作をラップした
Sinkが作成される - 一つ前の操作の
opWrapSinkでSinkをラップする(操作を合成する) - すべての中間操作の
opWrapSinkでSinkをラップする(操作を合成する) - 3.で得た
Sink(全操作合成済み)のbegin(long)を呼び出す(すべての操作のSinkに波及する) - ソースの
Spliterator#tryAdvance(Consumer)がtrueを返し続ける間、3.で得たSink(全操作合成済み)を渡して処理をおこなう tryAdvanceでは元のコレクションに変更がないか検査して、変更があった場合はConcurrentModificationExceptionを投げるtryAdvanceですべての要素が処理されたか検査して、すべて操作された場合はfalseを返す- 3.で得た
Sink(全操作合成済み)のend()を呼び出す
一方でsortedが挟まった場合のStreamは次のように実行されます。
- 上記の3.で得た
Sinkのbegin(long)を呼び出す(sortedでbeginの波及が止まる) - ソースの
Spliterator#tryAdvance(Consumer)がtrueを返し続ける間、上記の3.で得たSinkを渡して処理をおこなう(sortedで処理が中断される) tryAdvanceでは元のコレクションに変更がないか検査して、変更があった場合はConcurrentModificationExceptionを投げるtryAdvanceですべての要素が処理されたか検査して、すべて操作された場合はfalseを返す- 上記の3.で得た
Sinkのend()を呼び出す sortedのend()にて並び替えが行われるsorted以降のSinkのbegin()が呼び出される- 並べ替えが完了した要素の順番で
Sink#acceptが適用される sorted以降のSinkのend()が呼び出される
Sinkの実装によりストリームのソースのtryAdvance(正確には変更の検査)が呼び出される順番と操作の順番が入れ替わるために、冒頭の引用のような状況が発生するわけです。
まとめ
- ストリームのメソッド一つ一つで
Streamを実装したクラス(正確にはAbstractPipelineを継承したクラス)が作成される Sinkというインターフェースにすべての操作が合成されていく- ソース(
Spliterator)のtryAdvanceにSinkが渡されることで要素1つずつが順番に実行される sortedされるとそれ以降の操作順の制御はSinkがおこなう
前々からStreamはどのように実装されているのか、どのように遅延評価されているのか興味があったので、いい勉強になった。特にストリーム・パイプラインが双方向リンクドリストで実装されているのがわかったのが収穫だった。というか、今なら遅延評価するオレオレOptionalが書ける気がする。