_peek
Stream Inspection Interface
The below methods do not change stream data, but allow to inspect it in a variety of ways
- Source
- _peek.scala
Def
Inspect
Inspect
The given function will be run with every passing stream element.
(1 <> 5).stream.peek(_.tp).drain
// Output
1
2
3
4
5
- Source
- _peek.scala
Peek empty
Peek empty
The given function is executed once, only if stream is empty
(1 <> 10).stream.drop(_ > 0).peekEmpty("Stream is empty".tp).drain
// Output
Stream is empty
- Source
- _peek.scala
Peek end
Peek end
The given function is executed once, when stream is exhausted
The function receives total element count and Time.Length, it took for all elements to pass
(1 <> 10).stream
.peek(_ => J.sleep(100.Millis))
.peekEnd((cnt,time) => "Elements: "+cnt+" total time: "+time.tag tp())
.drain
// Output
Elements: 10 total time: 0.904106700 sec
Note: This will not run for empty streams
- Source
- _peek.scala
Custom events
Custom events
Allows to setup Stream.Custom.Events multiple monitoring events
(1 <> 1000).stream
.peek(_ => J.sleep(5.Millis))
.peekEvents(e => {
e.onBeforeFirst(t => "Started at: "+ t.dayTime.tag tp())
e.onEvery(1.Second, (c,t) => " Processed "+c+" in "+t.tag tp())
e.onAfterLast((c,t) => "Finished in: "+ t.tag + ", Element count: " + c tp())
})
.drain
// Output
Started at: 14:05:39.333
Processed 187 in 1.018583400 sec
Processed 371 in 2.020508100 secs
Processed 557 in 3.021843300 secs
Processed 743 in 4.023837400 secs
Processed 928 in 5.026982 secs
Finished in: 5.411673300 secs, Element count: 1000
- Source
- _peek.scala
Indexed peek
Indexed peek
The given function will be executed with every passing element and its index.
('a' <> 'f').stream.peekIndexed((i,c) => (""+i+" : "+c).tp, 1).drain
// Output
1 : a
2 : b
3 : c
4 : d
5 : e
6 : f
Note. By default indexing starts with 0, but it can be specified
- Source
- _peek.scala
Custom monitor
Custom monitor
Adds pre-build Stream.Custom.Event.Monitor
If passed monitor tests to be void (.isEmpty
), the operation is ignored
- Source
- _peek.scala
Peek start
Peek start
The given function is executed once, just before the first elements is about to pass.
('a' <> 'f').stream.peekStart(time => "Started at: "+time).drain
Note: This will not run for empty streams
- Source
- _peek.scala