_peek

trait _peek[A]

Stream Inspection Interface

The below methods do not change stream data, but allow to inspect it in a variety of ways

Source
_peek.scala
class java.lang.Object
trait scala.Matchable
class Any
trait _build
object Stream

Def

inline def peek[U](f: A => U): Stream[A]

Inspect

Inspect

The given function will be run with every passing stream element.

  (1 <> 5).stream.peek(_.tp).drain

  // Output
  1
  2
  3
  4
  5
Source
_peek.scala
inline def peekEmpty[U](f: => U): Stream[A]

Peek empty

Peek empty

The given function is executed once, only if stream is empty

  (1 <> 10).stream.drop(_ > 0).peekEmpty("Stream is empty".tp).drain

  // Output
  Stream is empty
Source
_peek.scala
inline def peekEnd[U](f: (Int, Time.Length) => U): Stream[A]

Peek end

Peek end

The given function is executed once, when stream is exhausted

The function receives total element count and Time.Length, it took for all elements to pass

  (1 <> 10).stream
    .peek(_ => J.sleep(100.Millis))
    .peekEnd((cnt,time) => "Elements: "+cnt+"  total time: "+time.tag tp())
    .drain

  // Output
  Elements: 10  total time: 0.904106700 sec

Note: This will not run for empty streams

Source
_peek.scala
inline def peekEvents[U](f: Stream.Custom.Event => U): Stream[A]

Custom events

Custom events

Allows to setup Stream.Custom.Events multiple monitoring events

 (1 <> 1000).stream
   .peek(_ => J.sleep(5.Millis))
   .peekEvents(e => {
     e.onBeforeFirst(t   => "Started at: "+ t.dayTime.tag tp())
     e.onEvery(1.Second, (c,t) => "  Processed "+c+" in "+t.tag tp())
     e.onAfterLast((c,t) => "Finished in: "+ t.tag + ",  Element count: " + c tp())
   })
   .drain

 // Output

 Started at: 14:05:39.333
   Processed 187 in 1.018583400 sec
   Processed 371 in 2.020508100 secs
   Processed 557 in 3.021843300 secs
   Processed 743 in 4.023837400 secs
   Processed 928 in 5.026982 secs
 Finished in: 5.411673300 secs, Element count: 1000
Source
_peek.scala
inline def peekIndexed[U](f: (Int, A) => U, start: Int): Stream[A]

Indexed peek

Indexed peek

The given function will be executed with every passing element and its index.

  ('a' <> 'f').stream.peekIndexed((i,c) => (""+i+" : "+c).tp, 1).drain

  // Output
  1 : a
  2 : b
  3 : c
  4 : d
  5 : e
  6 : f

Note. By default indexing starts with 0, but it can be specified

Source
_peek.scala

Custom monitor

Custom monitor

Adds pre-build Stream.Custom.Event.Monitor

If passed monitor tests to be void (.isEmpty), the operation is ignored

Source
_peek.scala
inline def peekStart[U](f: Time => U): Stream[A]

Peek start

Peek start

The given function is executed once, just before the first elements is about to pass.

  ('a' <> 'f').stream.peekStart(time => "Started at: "+time).drain

Note: This will not run for empty streams

Source
_peek.scala