Asked  7 Months ago    Answers:  5   Viewed   31 times

I have the following sample code:

System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);
System.out.println("-----------");
System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);

The output is as follows:

1
Result: 1
-----------
-1
0
1
0
1
2
1
2
3
Result: -1

From here I see that in first case stream really behaves lazily - we use findFirst() so once we have first element our filtering lambda is not invoked. However, in second case which uses flatMaps we see that despite first element which fulfils the filter condition is found (it's just any first element as lambda always returns true) further contents of the stream are still being fed through filtering function.

I am trying to understand why it behaves like this rather than giving up after first element is calculated as in the first case. Any helpful information would be appreciated.

 Answers

47

TL;DR, this has been addressed in JDK-8075939 and fixed in Java 10 (and backported to Java 8 in JDK-8225328).

When looking into the implementation (ReferencePipeline.java) we see the method [link]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}

which will be invoke for findFirst operation. The special thing to take care about is the sink.cancellationRequested() which allows to end the loop on the first match. Compare to [link]

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);
    // We can do better than this, by polling cancellationRequested when stream is infinite
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    try (Stream<? extends R> result = mapper.apply(u)) {
                        // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                        if (result != null)
                            result.sequential().forEach(downstream);
                    }
                }
            };
        }
    };
}

The method for advancing one item ends up calling forEach on the sub-stream without any possibility for earlier termination and the comment at the beginning of the flatMap method even tells about this absent feature.

Since this is more than just an optimization thing as it implies that the code simply breaks when the sub-stream is infinite, I hope that the developers soon prove that they “can do better than this”…


To illustrate the implications, while Stream.iterate(0, i->i+1).findFirst() works as expected, Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst() will end up in an infinite loop.

Regarding the specification, most of it can be found in the

chapter “Stream operations and pipelines” of the package specification:

Intermediate operations return a new stream. They are always lazy;

… Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source. (This behavior becomes even more important when the input stream is infinite and not merely large.)

Further, some operations are deemed short-circuiting operations. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time. Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.

It’s clear that a short-circuiting operation doesn’t guaranty a finite time termination, e.g. when a filter doesn’t match any item the processing can’t complete, but an implementation which doesn’t support any termination in finite time by simply ignoring the short-circuiting nature of an operation is far off the specification.

Tuesday, June 1, 2021
 
dirigibleplum
answered 7 Months ago
91

https://developer.mozilla.org/en/JavaScript/Reference/Global_Objects/NaN

NaN is a property of the global object.

The initial value of NaN is Not-A-Number — the same as the value of Number.NaN. In modern browsers, NaN is a non-configurable, non-writable property. Even when this is not the case, avoid overriding it.

https://developer.mozilla.org/en/JavaScript/Reference/Global_Objects/undefined

undefined is a property of the global object, i.e. it is a variable in global scope.

The initial value of undefined is the primitive value undefined.

Sunday, July 4, 2021
 
bancer
answered 5 Months ago
98

This should do the trick:

objects.stream().map(MyObj::getId).collect(Collectors.toList());

that said, the method reference :: operator allows you to reference any method in your classpath and use it as a lambda for the operation that you need.

As mentioned in the comments, a stream preserves order.

Friday, July 23, 2021
 
shin
answered 5 Months ago
44

Something like this if you're bound to use Stream.generate specifically :

IntStream inStream = Stream.generate(new AtomicInteger(1)::getAndIncrement)
        .limit(10)
        .mapToInt(t -> t);
inStream.forEach(System.out::println);

Edit: Using IntStream.generate, you can perform it as

IntStream.generate(new AtomicInteger(1)::getAndIncrement).limit(10);

Note: A better solution in terms of the API design would definitely be to make use of Stream.iterate for such a use case.

Monday, October 11, 2021
 
akohout
answered 2 Months ago
59

I found the answer and post it here to give others a quicker way to solve the same problem. It took me quite a while and some help from Guillaume to figure this out.

The geronimo transaction manager that is used in Apache Karaf inside the Aries transaction manager bundle can recover transactions. By default this feature is switched off in older karaf versions. In Apache Karaf 4.0.4 this has changed.

The config etc/org.apache.aries.transaction.cfg contains this setting

aries.transaction.recoverable = true

It activates the recovery support. The downside of this is that all jdbc and jms Connections must then implement NamedXAResource. This is outside the standard so most jdbc and jms providers do not support this.

The solution for ActiveMQ is to use the JCAPoolingConnectionFactory. For jdbc you can wrap your XADataSource using aries transaction jdbc. It provides a pooling support that also supports recovery.

If you are happy with how your application worked before you can simply set the above switch back to false. The transactions will then work like in older Apache Karaf versions.

Sunday, October 24, 2021
 
Rohit
answered 1 Month ago
Only authorized users can answer the question. Please sign in first, or register a free account.
Not the answer you're looking for? Browse other questions tagged :  
Share