_build

trait _Build[A] extends _extend[A] with _filter[A] with _group[A] with _map[A] with _mutate[A] with _order[A] with _parallel[A] with _peek[A] with _zip[A]

Build Stream Interface

Build methods extend stream pipeline creating functionality which will add, remove or modify stream elements, when they finally start moving throught the pipeline.

Once a single stream build method is invoked, the original stream object must not be used, all further processing must be done with the newly created stream.

Source
__.scala
trait _zip
trait _peek
trait _parallel
trait _order
trait _mutate
trait _map
trait _group
trait _filter
trait _drop
trait _take
trait _extend
class java.lang.Object
trait scala.Matchable
class Any
object Stream

Def

@targetName("join")
inline def +(v: A): Stream[A]

Alias for join

Alias for join

Creates a new Stream with given element appended to current Stream

  ((1 <> 5).stream + 99 + 100).tp

  // Output
  Stream(1, 2, 3, 4, 5, 99, 100)
Inherited from
_extend
Source
_extend.scala
@targetName("joinAll")
inline def ++(v: Stream[A]): Stream[A]

Alias for joinAll

Alias for joinAll

Creates a new Stream with given elements appended to current Stream

  (('1' <> '9').stream ++ ('a' <> 'd') ++ ('A' <> 'D')).tp

  // Output
  Stream(1, 2, 3, 4, 5, 6, 7, 8, 9, a, b, c, d, A, B, C, D)
Inherited from
_extend
Source
_extend.scala
@targetName("joinAllAt")
inline def ++@(index: Int, v: Stream[A]): Stream[A]

Alias for joinAllAt

Alias for joinAllAt

Creates a new Stream with given elements inserted into current Stream at given index

If index is out of range, the elements are prepended or appended

   (('a' <> 'f').stream ++@ (3, 'X' <> 'Z')).tp

   // Output
   Stream(a, b, c, X, Y, Z, d, e, f)
Inherited from
_extend
Source
_extend.scala
@targetName("joinAt")
inline def +@(index: Int, v: A): Stream[A]

Alias for joinAt

Alias for joinAt

Creates a new Stream with given element inserted into current Stream at given index

If index is out of range, the element is prepended or appended

 (('a' <> 'd').stream +@ (2, 'X')).tp

  // Output
  Stream(a, b, X, c, d)
Inherited from
_extend
Source
_extend.scala
inline def collect[B](f: scala.PartialFunction[A, B]): Stream[B]

Partial map

Partial map

Creates a new Stream by applying a partial function to all elements of current Stream on which the function is defined.

(0 <>> 26).stream.collect{
 case i if(i%2==0) => ('a' + i).toChar
}.tp

// Output
Stream(a, c, e, g, i, k, m, o, q, s, u, w, y)

Note:

  • collect is functionally similar to mapOpt, which is prefferable in most cases.
  • 'partialMap' would be a better name for this operation, but 'collect' is an established Scala convention.
Inherited from
_map
Source
_map.scala
inline def default(v: => A): Stream[A]

Default element

Default element

If current Stream is empty, the given element will be appended

Otherwise current Stream will not change

 (1 <>> 1).stream.default(99).tp // Prints Stream(99)

 (1 <>> 5).stream.default(99).tp // Prints Stream(1, 2, 3, 4)
Inherited from
_extend
Source
_extend.scala
inline def drop(f: A => Boolean): Stream[A]

Reverse filter

Reverse filter

Disallows Stream elements satisfying the given function

  (0 <>> 10).stream.drop(_ > 5).tp

  // Output
  Stream(0, 1, 2, 3, 4, 5)

Note: Scala equivalent is called "filterNot"

Inherited from
_drop
Source
_drop.scala
inline def DROP(f: A => Boolean): Stream[A]

Heavy reversed filter

Heavy reversed filter

Disallows Stream elements satisfying the given function

DROP is functionally equivalent to drop, but is fully inlined. It makes compiled code larger, but guarantees the best possible performance on large streams.

Inherited from
_drop
Source
_drop.scala
inline def dropDuplicates: Stream[A]

Duplicates reversed filter

Duplicates reversed filter

Drops elements equal to the passed in prior position

Note: To generally get rid of all duplicates, the stream must be sorted to arrange duplicates in sequence

(1 <> 10).stream.repeat(3).dropDuplicates.tp // Prints Stream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Inherited from
_drop
Source
_drop.scala
inline def dropDuplicatesBy[B](f: A => B): Stream[A]

Mapped duplicates reversed filter

Mapped duplicates reversed filter

Drops elements, which evaluate to the same value as elements passed in prior position

Note: To generally get rid of all duplicates, the stream must be sorted by the mapping function

  (1 <> 100).stream.dropDuplicatesBy(_.toString.length).tp

  // Output
  Stream(1, 10, 100)
Inherited from
_drop
Source
_drop.scala
inline def dropEvery(nTh: Int): Stream[A]

Every Nth element reversed filter

Every Nth element reversed filter

Drops every nTh element

  (1 <> 10).stream.dropEvery(3).tp   // Prints: Stream(1, 2, 4, 5, 7, 8, 10)
Inherited from
_drop
Source
_drop.scala
inline def dropFirst(n: Int): Stream[A]

Head reversed filter

Head reversed filter

Drops given number of first elements

  (1 <> 10).stream.dropFirst(3).tp  // Prints  Stream(4, 5, 6, 7, 8, 9, 10)
Inherited from
_drop
Source
_drop.scala
inline def dropLast(n: Int): Stream[A]

Tail reversed filter

Tail reversed filter

Drops given number of elements coming last

  (1 <> 10).stream.dropLast(3).tp  // Prints  Stream(1, 2, 3, 4, 5, 6, 7)

Note: This method will block on unlimited streams

Inherited from
_drop
Source
_drop.scala
inline def dropOnly(v: A): Stream[A]

Single value reversed filter

Single value reversed filter

Drops only specified value.

  (1 <> 4).stream.dropOnly(3).tp

  // Output
  Stream(1, 2, 4)

Note: dropOnly is more efficient than general filter ".drop(_ == value)", because there is no function involved.

Inherited from
_drop
Source
_drop.scala
inline def dropRange(i: Int.Range): Stream[A]

Range reversed filter

Range reversed filter

Only allows elements outside specified sequencial range

  ('a' <> 'f').stream.dropRange(2 <> 3).tp

  // Output
  Stream(a, b, e, f)

Note: Range indexing starts from 0

Inherited from
_drop
Source
_drop.scala
inline def dropSequence(seq: Stream[A]): Stream[A]
Inherited from
_drop
Source
_drop.scala
inline def dropSequenceBy[B](f: A => B, seq: Stream[B]): Stream[A]
Inherited from
_drop
Source
_drop.scala
inline def dropValues(v: Stream[A]): Stream[A]

Multi value reversed filter

Multi value reversed filter

Drops only provided set of values

  (0 <>> 10).stream.dropValues(8,3,5).tp

  // Output
  Stream(0, 1, 2, 4, 6, 7, 9)

Note: dropValues is macro optimized when given value tuples sized from 2 to 5

Inherited from
_drop
Source
_drop.scala
inline def dropValuesBy[B](f: A => B, v: Stream[B]): Stream[A]

Mapped multi value reversed filter

Mapped multi value reversed filter

Drops only values, which convert to provided set of values

  (0 <>> 10).stream.dropValuesBy(_ % 5, (1,3) ).tp

  // Output
  Stream(0, 2, 4, 5, 7, 9)

Note: dropValuesBy is macro optimized when given value tuples sized from 2 to 5

Inherited from
_drop
Source
_drop.scala
inline def dropVoid(using d: Any.Def.Void[A]): Stream[A]

Void value reversed filter

Void value reversed filter

Drops elements which test to be void

Inherited from
_drop
Source
_drop.scala
inline def dropWhile(f: A => Boolean): Stream[A]

Coditional reversed head filter

Coditional reversed head filter

Discards first consecutive elements satisfying the condition

  def stream = (1 <> 5).stream ++ (1 <> 5)

  stream.tp                     // Prints Stream(1, 2, 3, 4, 5, 1, 2, 3, 4, 5)

  stream.dropWhile(_ <= 3).tp   // Prints Stream(4, 5, 1, 2, 3, 4, 5)

Note: Everything starting from the first non compliant element will be allowed (including later compliant elements)

Inherited from
_drop
Source
_drop.scala
inline def enablePreview: Stream[A] & Stream.Preview[A]

Enables preview capabilities

Enables preview capabilities

Returns Stream.Preview, which allows to pre-load and inspect elements, even before they are read

  def strm : Stream[String] = ???

  if(strm.enablePreview.previewSize > 1000) "Stream is over 1K".TP
Inherited from
_mutate
Source
_mutate.scala
inline def enableSize: Stream[A] & Able.Size

Adds sizing information

Adds sizing information

If Stream already has sizing, this method is a simple cast, otherwise, the elements might be buffered and counted.

Inherited from
_mutate
Source
_mutate.scala
inline def FILTER(f: A => Boolean): Stream[A]

Legacy heavy filter

Legacy heavy filter

Filters Stream elements according to given function

FILTER is functionally equivalent to filter, but is fully inlined. It makes compiled code larger, but guarantees the best possible performance on large streams.

Note: TAKE is usually used instead.

Inherited from
_Filter
Source
__.scala
inline def filter(f: A => Boolean): Stream[A]

Legacy filter

Legacy filter

Filters Stream elements according to given function

  (0 <>> 10).stream.filter(_ > 5).tp

  // Output
  Stream(6, 7, 8, 9)

Note: take is usually used instead.

Inherited from
_Filter
Source
__.scala
inline def FLAT_MAP[B](f: A => Stream[B])(using s: Specialized[B]): s.Stream

Heavy flat map

Heavy flat map

FLAT_MAP is functionally equivalent to flatMap, but is fully inlined. It makes compiled code larger, but guarantees the best possible performance on large streams.

Inherited from
_map
Source
_map.scala
inline def flatMap[B](f: A => Stream[B])(using s: Specialized[B]): s.Stream

Flat map

Flat map

Creates a new Stream by applying given function to all elements of current Stream and concatenating the results

(1 <> 3).stream.flatMap(i => Stream(i, i*10, i*100)).tp

// Output
Stream(1, 10, 100, 2, 20, 200, 3, 30, 300)
Inherited from
_map
Source
_map.scala
inline def flatten[B](using d: Any.Def.ToStream[A, B], s: Specialized[B]): s.Stream

Converts a stream of streams into a flat stream

Converts a stream of streams into a flat stream

The operation will only compile if stream elements are streams or stream convertible entities, like Able.Stream, Iterable, Iterator, etc.

val vs: Stream[Stream[Char]] = Stream(
  'a' <> 'd',
  Pack('x', 'y', 'z'),
  Vector('v', 'e', 'c', 't', 'o', 'r'))

vs.flatten.tp // Prints Stream(a, b, c, d, x, y, z, v, e, c, t, o, r)
Inherited from
_map
Source
_map.scala
inline def group(f: (A, A) => Boolean, peek: (A, Boolean) => U): Stream[Stream[A]]

Group by test

Group by test

Puts elements in the same group based on a function test for every two consecutive elements

   // Putting Ints into groups of 3

   (0 <> 20).stream.group(_ / 3 == _ / 3).print

   // Output
   ---------------
   ?
   ---------------
   Stream(0, 1, 2)
   Stream(3, 4, 5)
   Stream(6, 7, 8)
   Stream(9, 10, 11)
   Stream(12, 13, 14)
   Stream(15, 16, 17)
   Stream(18, 19, 20)
   ---------------
Value Params
f

function for two consecutive elements. if 'false' is returned, the second tested element will start a new group

peek

side-effect convenience function will run for each element. Boolean parameter indicates if the element starts a new group

Inherited from
_group
Source
_group.scala
inline def group: Stream[Stream[A]]

Simple grouping

Simple grouping

Puts consecutive elements in the same group if they are equal

   def stream =  Stream(1, 2, 3).repeat(3)

   stream.tp           // Prints Stream(1, 1, 1, 2, 2, 2, 3, 3, 3)

   stream.group.print  // Prints  ------------
                                ?
                                ------------
                                Stream(1, 1, 1)
                                Stream(2, 2, 2)
                                Stream(3, 3, 3)
                                ------------

Note: Non consecutive equal elements will end up in different groups. Prior ordering might be needed

Inherited from
_group
Source
_group.scala
inline def groupBy(f: A => Any, more: A => Any*): Stream[Stream[A]]

Grouping on properties

Grouping on properties

Puts consecutive elements in the same group if all the specified properties are equal

When properties change, a new group is started

    ('#' <> '|').stream.groupBy(_.isLetter, _.isDigit).print

   // Output
   ---------------------------------------------------------------------------------
   ?
   ---------------------------------------------------------------------------------
   Stream(#, $, %, &, ', (, ), *, +, ,, -, ., /)
   Stream(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
   Stream(:, ;, <, =, >, ?, @)
   Stream(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, B, U, V, W, X, Y, Z)
   Stream([, \, ], /\, _, `)
   Stream(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, p, t, u, v, w, x, y, z)
   Stream({, |)
   ---------------------------------------------------------------------------------
Value Params
properties

a set of functions, each indicating element property

Inherited from
_group
Source
_group.scala
inline def groupEvery(cnt: Int): Stream[Stream[A]]

Fixed size groups

Fixed size groups

Puts consecutive elements into fixed size groups

('a' <> 'z').stream.groupEvery(8).print

// Output
-------------------------
?
-------------------------
Stream(a, b, c, d, e, f, g, h)
Stream(i, j, k, l, m, n, o, p)
Stream(q, r, p, t, u, v, w, x)
Stream(y, z)
-------------------------
Inherited from
_group
Source
_group.scala
inline def groupWith[B](f: A => B): Stream[(B, Stream[A])]

Grouping on a property

Grouping on a property

Puts consecutive elements in the same group if their properties are equal

  (0 <> 20).stream.groupWith(_ / 3).print

  // Output
  -- -------------
  _1 _2
  -- -------------
  0  Stream(0, 1, 2)
  1  Stream(3, 4, 5)
  2  Stream(6, 7, 8)
  3  Stream(9, 10, 11)
  4  Stream(12, 13, 14)
  5  Stream(15, 16, 17)
  6  Stream(18, 19, 20)
  -- -------------

Note: groupWith also returns the groupped property value (unlike groupBy)

Value Params
properties

a set of functions, each indicating an element property

Inherited from
_group
Source
_group.scala
inline def hideSizeData: Stream[A]

Loose size information

Loose size information

Many streams return ''sizeLongOpt'', knowing their current size

hideSizeData drops sizing information, so some optimizations will not be available

This is primarily for testing and debugging

Inherited from
_mutate
Source
_mutate.scala
inline def join(v: A): Stream[A]

Join element

Join element

Creates a new Stream with given element appended to current Stream

  (1 <> 5).stream.join(99).join(100).tp

  // Output
  Stream(1, 2, 3, 4, 5, 99, 100)
Inherited from
_extend
Source
_extend.scala
inline def joinAll(v: Stream[A]): Stream[A]

Join all

Join all

Creates a new Stream with given elements appended to current Stream

  ('1' <> '9').stream.joinAll('a' <> 'd').joinAll('A' <> 'D').tp

  // Output
  Stream(1, 2, 3, 4, 5, 6, 7, 8, 9, a, b, c, d, A, B, C, D)
Inherited from
_extend
Source
_extend.scala
inline def joinAllAt(index: Int, v: Stream[A]): Stream[A]

Join all at position

Join all at position

Creates a new Stream with given elements inserted into current Stream at given index

If index is out of range, the elements are prepended or appended

   ('a' <> 'f').stream.joinAllAt(3, 'X' <> 'Z').tp

   // Output
   Stream(a, b, c, X, Y, Z, d, e, f)
Inherited from
_extend
Source
_extend.scala
inline def joinAt(index: Int, v: A): Stream[A]

Join element at position

Join element at position

Creates a new Stream with given element inserted into current Stream at given index

If index is out of range, the element is prepended or appended

 ('a' <> 'd').stream.joinAt(2, 'X').tp

  // Output
  Stream(a, b, X, c, d)
Inherited from
_extend
Source
_extend.scala
inline def load: Stream[A] & Able.Size

Preload all

Preload all

Immediately loads all stream elements into memory, so they are no longer dependent on underlying sources.

  def s : Stream[String] = ???

  s.load

  // is functionally same as

  s.toBuffer.stream
Inherited from
_mutate
Source
_mutate.scala
inline def map[B](f: A => B)(using s: Specialized[B]): s.Stream

Simple map

Simple map

Creates a new Stream where each element is a result of applying given function to current Stream elements

(0 <>> 26).stream.map(i => ('a' + i).toChar).tp

// Output
Stream(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z)
Inherited from
_map
Source
_map.scala
inline def MAP[B](f: A => B)(using s: Specialized[B]): s.Stream

Heavy map

Heavy map

MAP is functionally equivalent to map, but is fully inlined. It makes compiled code larger, but guarantees the best possible performance on large streams.

Inherited from
_map
Source
_map.scala
inline def MAP_OPT(f: A => OPT)(using o: Specialized.Opt[B, OPT], s: Specialized[B]): s.Stream

Heavy optional map

Heavy optional map

MAP_OPT is functionally equivalent to mapOpt, but is fully inlined. It makes compiled code larger, but guarantees the best possible performance on large streams.

Inherited from
_map
Source
_map.scala
inline def mapIf(condition: A => Boolean, fun: A => A): Stream[A]

Conditional map

Conditional map

This is a synthetic oeration which is inlined as:

map(v => if(condition(v)) fun(v) else v)

In some cicumstances using "mapIf" does not make sense, in some it is really usefull.

Inherited from
_map
Source
_map.scala
inline def mapOpt[B,OPT<:Any.Opt[B]](f: A => OPT)(using s: Specialized[B]): s.Stream

Optional map

Optional map

Creates a new Stream where each element is a result of applying given function to Stream elements. If the function returns void option, the element is dropped.

(1 <> 10).stream.mapOpt(i => if(i % 2 == 0) "Even_"+i else VOID).tp

// Output
Stream(Even_2, Even_4, Even_6, Even_8, Even_10)

Pattern matching can be used, but the last void case must always be provided explicitly:

(0 <>> 26).stream.mapOpt{
 case i if(i % 2 == 0) => ('a' + i).toChar
 case _                => VOID
}.tp

// Output
Stream(a, c, e, g, i, k, m, o, q, s, u, w, y)

Note:

  • All cases must return the same type, otherwise the operation will not compile.
  • mapOpt is functionally similar to collect, but is faster (PartialFunction in collect has to be evaluated twice)
Inherited from
_map
Source
_map.scala
@targetName("nonEmptyOpt")
inline def nonEmptyOpt: Opt[Stream[A]]
Inherited from
_mutate
Source
_mutate.scala

Parallel

Parallel

Returns Stream.Flow with parallel execution

Each consecutive element will be sent to a new thread for processing

  (1 <> 5).stream
     .parallel
     .map("Value: " + _ + "\t" + Thread.currentThread.getName)
     .foreach(println)

  // Possible Output
  Value: 1    ForkJoinPool.commonPool-worker-9
  Value: 3    ForkJoinPool.commonPool-worker-11
  Value: 2    main
  Value: 4    ForkJoinPool.commonPool-worker-2
  Value: 5    ForkJoinPool.commonPool-worker-4
Inherited from
_parallel
Source
_parallel.scala
def parallelIf(v: Boolean): Stream.Flow[A]

Conditionally parallel

Conditionally parallel

Returns Stream.Flow with parallel or sequential implementation, depending on given parameter

   (1 <> 50).stream.parallelIf(true).isParallel   // Returns true

   (1 <> 50).stream.parallelIf(false).isParallel  // Returns false
Inherited from
_parallel
Source
_parallel.scala
def parallelIfOver(threshold: Int): Stream.Flow[A]

Conditionally parallel

Conditionally parallel

Returns Stream.Flow with parallel or sequential implementation, depending on stream having element count equal or greater than given ''threshold''

  (1 <> 50).stream.parallelIfOver(100).isParallel   // Returns false

  (1 <> 200).stream.parallelIfOver(100).isParallel  // Returns true
Inherited from
_parallel
Source
_parallel.scala
def parallelWithPriority(p: J.Priority, parallelism: Int): Stream.Flow[A]

Parallel with Priority

Parallel with Priority

This is very expensive operation, because it creates a custom thread pool. It only sutable for long running streams

   (1 <> 100).stream.parallelWithPriority(MIN, 4).foreach(v => ())

   (1 <> 100).stream.parallelWithPriority(MAX).foreach(v => ())

   (1 <> 100).stream.parallelWithPriority(J.Priority(5), 4).foreach(v => ())

Note: parallelism determines how many parallel threads are allowed. Default value is CPU core count minus 1

Inherited from
_parallel
Source
_parallel.scala
inline def partition(p: A => Boolean, more: A => Boolean*): Stream[Stream[A]]

Predicate grouping

Predicate grouping

All stream elements are grouped by given predicates, which are applied in sequence. Thus if an element is accepted into a group, it will not be evaluated by the rest of the filters.

The resulting stream size will be equal to the number of predicates plus one. The last group will hold spill over elements, not accepted by any predicate. Groups can be empty.

val (odd,even) = (1 <> 10).stream.partition(_ % 2 == 1).tuple2

odd.tp
even.tp

// Output
Stream(1, 3, 5, 7, 9)
Stream(2, 4, 6, 8, 10)


// Age groups
(1 <> 80).stream.partition(_ <= 12, _ in 13 <> 19, _ < 30, _ in 30 <> 40, _ < 50, _ < 65).print

-------------------------------------------------------------------
?
-------------------------------------------------------------------
Stream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
Stream(13, 14, 15, 16, 17, 18, 19)
Stream(20, 21, 22, 23, 24, 25, 26, 27, 28, 29)
Stream(30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40)
Stream(41, 42, 43, 44, 45, 46, 47, 48, 49)
Stream(50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64)
Stream(65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80)
-------------------------------------------------------------------
Inherited from
_group
Source
_group.scala
inline def peek[U](f: A => U): Stream[A]

Inspect

Inspect

The given function will be run with every passing stream element.

  (1 <> 5).stream.peek(_.tp).drain

  // Output
  1
  2
  3
  4
  5
Inherited from
_peek
Source
_peek.scala
inline def peekEmpty[U](f: => U): Stream[A]

Peek empty

Peek empty

The given function is executed once, only if stream is empty

  (1 <> 10).stream.drop(_ > 0).peekEmpty("Stream is empty".tp).drain

  // Output
  Stream is empty
Inherited from
_peek
Source
_peek.scala
inline def peekEnd[U](f: (Int, Time.Length) => U): Stream[A]

Peek end

Peek end

The given function is executed once, when stream is exhausted

The function receives total element count and Time.Length, it took for all elements to pass

  (1 <> 10).stream
    .peek(_ => J.sleep(100.Millis))
    .peekEnd((cnt,time) => "Elements: "+cnt+"  total time: "+time.tag tp())
    .drain

  // Output
  Elements: 10  total time: 0.904106700 sec

Note: This will not run for empty streams

Inherited from
_peek
Source
_peek.scala
inline def peekEvents[U](f: Stream.Custom.Event => U): Stream[A]

Custom events

Custom events

Allows to setup Stream.Custom.Events multiple monitoring events

 (1 <> 1000).stream
   .peek(_ => J.sleep(5.Millis))
   .peekEvents(e => {
     e.onBeforeFirst(t   => "Started at: "+ t.dayTime.tag tp())
     e.onEvery(1.Second, (c,t) => "  Processed "+c+" in "+t.tag tp())
     e.onAfterLast((c,t) => "Finished in: "+ t.tag + ",  Element count: " + c tp())
   })
   .drain

 // Output

 Started at: 14:05:39.333
   Processed 187 in 1.018583400 sec
   Processed 371 in 2.020508100 secs
   Processed 557 in 3.021843300 secs
   Processed 743 in 4.023837400 secs
   Processed 928 in 5.026982 secs
 Finished in: 5.411673300 secs, Element count: 1000
Inherited from
_peek
Source
_peek.scala
inline def peekIndexed[U](f: (Int, A) => U, start: Int): Stream[A]

Indexed peek

Indexed peek

The given function will be executed with every passing element and its index.

  ('a' <> 'f').stream.peekIndexed((i,c) => (""+i+" : "+c).tp, 1).drain

  // Output
  1 : a
  2 : b
  3 : c
  4 : d
  5 : e
  6 : f

Note. By default indexing starts with 0, but it can be specified

Inherited from
_peek
Source
_peek.scala

Custom monitor

Custom monitor

Adds pre-build Stream.Custom.Event.Monitor

If passed monitor tests to be void (.isEmpty), the operation is ignored

Inherited from
_peek
Source
_peek.scala
inline def peekStart[U](f: Time => U): Stream[A]

Peek start

Peek start

The given function is executed once, just before the first elements is about to pass.

  ('a' <> 'f').stream.peekStart(time => "Started at: "+time).drain

Note: This will not run for empty streams

Inherited from
_peek
Source
_peek.scala
inline def raw(using sp: Specialized.Primitive[A]): s.Stream

Specialize

Specialize

Converts current stream into specialized on underlying primitive type. If stream is already specialized, the conversion is a simple cast.

   val s  : Stream[Int]     = 1 <> 10

   val ss : Int.Stream = s.raw

Note: If underlying type is not primitive, the method will not compile

Inherited from
_mutate
Source
_mutate.scala
inline def ref: Stream[A]

Generalize

Generalize

If stream is specialized it will be up-cast to general Val.Stream type, and further operations will be general (unless they are specialized, like map)

  val special : Int.Pack  = (1 <> 10).stream.pack

  val general : Pack[Int] = (1 <> 10).stream.ref.pack

  special.getClass.tp // Prints class scalqa.lang.int.g.Pack

  general.getClass.tp // Prints class scalqa.val.pack.z.ArrayPack

Note: This is a true zero cost operation. It does not change byte code (only compiler context)

Inherited from
_mutate
Source
_mutate.scala
inline def repeat(times: Int): Stream[A]

Repeat elements

Repeat elements

Creates a new Stream where each elements from current Stream is repeated given number of times

 (0 <> 2).stream.repeat(3).tp

 // Output
 Stream(0, 0, 0, 1, 1, 1, 2, 2, 2)
Inherited from
_extend
Source
_extend.scala
inline def replaceSequence(seq: Stream[A], to: Stream[A]): Stream[A]
Inherited from
_mutate
Source
_mutate.scala
inline def replaceSequenceBy[B](f: A => B, seq: Stream[B], to: Stream[A]): Stream[A]
Inherited from
_mutate
Source
_mutate.scala
inline def reverse: Stream[A]

Reverse order

Reverse order

Re-arranges all elements is reverse order

('A' <> 'F').stream.reverse.tp  // Prints Stream(F, E, D, C, B, A)
Inherited from
_mutate
Source
_mutate.scala
inline def reverseEvery(size: Int): Stream[A]

Reverse order in segments

Reverse order in segments

Reverses order of elements within segments of fixed size

(1 <> 15).stream.reverseEvery(5).tp

(1 <> 15).stream.reverseEvery(5).reverseEvery(3).reverseEvery(7).tp

// Output
Stream(5, 4, 3, 2, 1, 10, 9, 8, 7, 6, 15, 14, 13, 12, 11)

Stream(7, 2, 1, 10, 5, 4, 3, 12, 11, 6, 15, 14, 9, 8, 13)

Use Case: Predefined Shuffle

For testing purposes it is often required to get elements in random order. However the order cannot be completely random, if we want to replicate bugs

reverseEvery can shuffle elements in a predefined order which looks random

Inherited from
_mutate
Source
_mutate.scala
inline def shuffle: Stream[A]

Randomize order

Randomize order

Re-arranges elements is random order

Note. "reverseEvery" might be a better choice if need repeatable randomness

Inherited from
_mutate
Source
_mutate.scala
inline def sliding(size: Int, step: Int): Stream[Stream[A]]

Sliding group view

Sliding group view

Example: group size 3 with step 1

 ('a' <> 'g').stream.sliding(3).print

 // Output
 ----------
 ?
 ----------
 Stream(a, b, c)
 Stream(b, c, d)
 Stream(c, d, e)
 Stream(d, e, f)
 Stream(e, f, g)
 ----------

Example: group size 4 with step 2

 ('a' <> 'g').stream.sliding(4,2).print

 // Output
 -------------
 ?
 -------------
 Stream(a, b, c, d)
 Stream(c, d, e, f)
 Stream(e, f, g)
 -------------
Inherited from
_group
Source
_group.scala
inline def sort(using o: Ordering[A]): Stream[A]

Sort

Sort

Sorts stream elements with given Ordering

  Stream(5, 1, 4, 2, 3).sort.tp  // Prints Stream(1, 2, 3, 4, 5)
Inherited from
_order
Source
_order.scala
inline def sortBy[B](f1: A => B, f2: A => C, f3: A => D)(using Ordering[B], Ordering[C], Ordering[D]): Stream[A]

Sort by three properties

Sort by three properties

Sorts stream on first property, then if indeterminate on second, etc...

Inherited from
_order
Source
_order.scala
inline def sortBy[B](f1: A => B, f2: A => C)(using Ordering[B], Ordering[C]): Stream[A]

Sort by two properties

Sort by two properties

Sorts stream on first property, and then, if indeterminate on second

Inherited from
_order
Source
_order.scala
inline def sortBy[B](f: A => B)(using o: Ordering[B]): Stream[A]

Sort by property

Sort by property

Sorts stream of elements based on a single property

  Stream("aaaa", "bb", "ccc", "d").sortBy(_.length).tp

  // Output
  Stream(d, bb, ccc, aaaa)
Inherited from
_order
Source
_order.scala
inline def sortReversed(using o: Ordering[A]): Stream[A]

Sort reversed

Sort reversed

Reverse sorts stream elements with given Ordering

  Stream(5, 1, 4, 2, 3).sortReversed.tp  // Prints Stream(5, 4, 3, 2, 1)
Inherited from
_order
Source
_order.scala
inline def splitAt(positions: Int*): Stream[Stream[A]]

Positional split

Positional split

Splits Stream at specified positions

val (s1,s2,s3) = (0 <> 20).stream.splitAt(5, 15).tuple3

s1.tp   // Prints Stream(0, 1, 2, 3, 4)
s2.tp   // Prints Stream(5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
s3.tp   // Prints Stream(15, 16, 17, 18, 19, 20)

Note. The same could be accomplished with readStream

val s3 = (0 <> 20).stream
val s1 = s3.readStream(5)
val s2 = s3.readStream(10)
	```
Inherited from
_group
Source
_group.scala
inline def synchronize: Stream[A]

Synchronize access

Synchronize access

Nothing fancy, just a convenience "synchronized" wrapper

 val nonSyncStream: Stream[Int] = (0 <>> 10000).stream

 (1 <> 10000).stream.parallel.map(_ => nonSyncStream.read ).stream.sort.takeDuplicates.count.tp  // Prints 0 to few hundred count

 val syncStream: Stream[Int] = (0 <>> 10000).stream.synchronize

 (1 <> 10000).stream.parallel.map(_ => syncStream.read ).stream.sort.takeDuplicates.count.tp    // Prints 0
Inherited from
_mutate
Source
_mutate.scala
inline def take(f: A => Boolean): Stream[A]

Main filter

Main filter

Only takes Stream elements satisfying the given function

  (0 <>> 10).stream.take(_ > 5).tp

  // Output
  Stream(6, 7, 8, 9)

Note: Traditional method filter is also available and can be used, but take is prefferable in most cases.

Inherited from
_take
Source
_take.scala
inline def TAKE(f: A => Boolean): Stream[A]

Heavy filter

Heavy filter

Filters Stream elements according to given function

TAKE is functionally equivalent to take, but is fully inlined. It makes compiled code larger, but guarantees the best possible performance on large streams.

Inherited from
_take
Source
_take.scala
inline def takeDuplicates: Stream[A]

Duplicates filter

Duplicates filter

Takes only elements equal to the passed in prior position

Note: To generally get all duplicates, the stream must be sorted to arrange them in sequence

   Stream(1,1,2,3,3,4,5,5,5).takeDuplicates.tp

   // Output
   Stream(1, 3, 5, 5)
Inherited from
_take
Source
_take.scala
inline def takeDuplicatesBy[B](f: A => B): Stream[A]

Mapped duplicates filter

Mapped duplicates filter

Takes only elements, which evaluate to the same value as elements passed in prior position

Note: To generally get all duplicates, the stream must be sorted by the mapping function

  (0 <> 10).stream.takeDuplicatesBy(_ / 2).tp

  // Output
  Stream(1, 3, 5, 7, 9)
Inherited from
_take
Source
_take.scala
inline def takeEvery(nTh: Int): Stream[A]

Every Nth element filter

Every Nth element filter

Only lets every nTh element

  (1 <> 20).stream.takeEvery(4).tp   // Prints: Stream(4, 8, 12, 16, 20)
Inherited from
_take
Source
_take.scala
inline def takeFirst(n: Int): Stream[A]

Head filter

Head filter

Only takes given number of first elements

  (1 <> 10).stream.takeFirst(3).tp  // Prints  Stream(1, 2, 3)
Inherited from
_take
Source
_take.scala
inline def takeIndexed(f: (Int, A) => Boolean, start: Int): Stream[A]

Indexed filter

Indexed filter

Only lets elements satisfying the given function, which also accepts element sequential index

  ('a' <> 'z').stream.takeIndexed((i, _) => i >= 2 && i <= 7, 1).tp

  // Output
  Stream(b, c, d, e, f, g)

Note: By default indexing starts from 0, but starting value can also be explicitly specified.

Inherited from
_take
Source
_take.scala
inline def takeLast(n: Int): Stream[A]

Tail filter

Tail filter

Only takes given number of elements coming last

  (1 <> 10).stream.takeLast(3).tp  // Prints  Stream(8, 9, 10)

Note: This method will block on unlimited streams

Inherited from
_take
Source
_take.scala
inline def takeOnly(v: A): Stream[A]

Single value filter

Single value filter

Filters only specified value.

  (0 <>> 10).stream.takeOnly(5).tp

  // Output
  Stream(5)

Note: takeOnly is more efficient than general filter ".take(_ == value)", because there is no function involved.

Inherited from
_take
Source
_take.scala
inline def takeRange(i: Int.Range): Stream[A]

Range filter

Range filter

Only allows elements withing specified sequencial range

  ('a' <> 'z').stream.takeRange(1 <> 7).tp

  // Output
  Stream(b, c, d, e, f, g, h)

Note: Range indexing starts from 0

Inherited from
_take
Source
_take.scala
inline def takeType[B](using t: scala.reflect.ClassTag[B]): Stream[B]

Type filter

Type filter

Only lets elements of specified type

  Stream(1, '2', "3", new Object(), 0.0).takeType[String].tp  // Prints: Stream(3)
Inherited from
_take
Source
_take.scala
inline def takeValues(v: Stream[A]): Stream[A]

Multi value filter

Multi value filter

Takes only provided set of values

    ('a' <> 'z').stream.takeValues('z','x','b').tp   // Prints Stream('b','x','y')

    ('a' <> 'z').stream.takeValues('b' <> 'f').tp    // Prints Stream('b','c','d','e','f')

Note: takeValues is macro optimized when given value tuples sized from 2 to 5

Inherited from
_take
Source
_take.scala
inline def takeValuesBy[B](f: A => B, v: Stream[B]): Stream[A]

Mapped multi value filter

Mapped multi value filter

Takes only values, which convert to provided set of values

  (0 <>> 10).stream.takeValuesBy(_ % 5, (1,3) ).tp

  // Output
  Stream(1, 3, 6, 8)

Note: takeValuesBy is macro optimized when given value tuples sized from 2 to 5

Inherited from
_take
Source
_take.scala
inline def takeWhile(f: A => Boolean): Stream[A]

Conditional head filter

Conditional head filter

Only takes first consecutive elements satisfying the condition

  def stream = (1 <> 5).stream ++ (1 <> 5)

  stream.tp                     // Prints Stream(1, 2, 3, 4, 5, 1, 2, 3, 4, 5)

  stream.takeWhile(_ <= 3).tp    // Prints Stream(1, 2, 3)

Note: Everything starting from the first non compliant element will be discarded (including later compliant elements)

Inherited from
_take
Source
_take.scala
inline def transpose[B](using f: A => Stream[B]): Stream[Stream[B]]

Transpose

Transpose

Transposes matrix where rows become columns

 def stream : Stream[Stream[Int]] = Stream(11 <> 15,
                             List(21, 22, 23, 24, 25),
                             Vector(31, 32, 33, 34, 35))

 stream.print

 stream.transpose.print

 // Output
 ---------------------
 ?
 ---------------------
 Stream(11, 12, 13, 14, 15)
 Stream(21, 22, 23, 24, 25)
 Stream(31, 32, 33, 34, 35)
 ---------------------

 -------------
 ?
 -------------
 Stream(11, 21, 31)
 Stream(12, 22, 32)
 Stream(13, 23, 33)
 Stream(14, 24, 34)
 Stream(15, 25, 35)
 -------------
Inherited from
_mutate
Source
_mutate.scala
inline def unfold(f: Stream[A] => Opt[A]): Stream[A]

Lazy generator

Lazy generator

Lazily unfolds next stream value with a function taking all prior values

If the given function returns void option, the stream ends

 // Unfolding Fibonacci Sequence

 (0 <> 1).stream.unfold(_.takeLast(2).sum).takeFirst(20).tp

 // Output
 Stream(0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181)

Note: Method .takeFirst(20) is needed, because otherwise the stream will never end and would be hard to print out

Inherited from
_extend
Source
_extend.scala
inline def unzip[B,C](using f: A => (B, C)): (Stream[B], Stream[C])

Unzips stream in two

Unzips stream in two

Unzips a stream of tupled values in two

 val pairs = ('a' <> 'g').stream.zipValue(_.toUpper).pack

 pairs.stream.tp  // Prints Stream((a,A), (b,B), (c,C), (d,D), (e,E), (f,F), (g,G))

 val (left, right) = pairs.stream.unzip

 left.tp   // Prints Stream(a, b, c, d, e, f, g)

 right.tp  // Prints Stream(G, F, E, D, C, B, A)
Inherited from
_zip
Source
_zip.scala
inline def zip[B](that: Stream[B]): Stream[(A, B)]

Merge

Merge

Merges two streams in one, creating tuples of corresponding elements

  (1 <> 100).stream.zip('A' <> 'D').tp  // Prints Stream((1,A), (2,B), (3,C), (4,D))

If one of the streams is shorter, the excess elements are lost

Inherited from
_zip
Source
_zip.scala
inline def zipAll[B](that: Stream[B], thisDflt: Opt[A], thatDflt: Opt[B]): Stream[(A, B)]

Merge stream

Merge stream

Merges two streams in one, creating tuples of corresponding elements

If one of the streams is shorter, the provided defaults are used. If the default is not available, operation fails

  ('a' <> 'f').stream.zip('A' <> 'H', '?', '?').tp

  // Output
  Stream((a,A), (b,B), (c,C), (d,D), (e,E), (f,F), (?,G), (?,H))
Value Params
that

the stream to merge with this

thatDflt

if that Stream has fewer elements, ''thatDflt'' will be used to fill the voids. Fails if ''thatDflt'' is required, but not available

thisDflt

if this Stream has fewer elements, ''thisDflt'' will be used to fill the voids. Fails if ''thisDflt'' is required, but not available

Inherited from
_zip
Source
_zip.scala
inline def zipFoldAs[B](start: B, f: (B, A) => B): Stream[(A, B)]

Merges current folding value

Merges current folding value

(1 <> 7).stream.zipFoldAs(0L)(_ + _).print

// "Running Total" Output
-- --
?  ?
-- --
1  1
2  3
3  6
4  10
5  15
6  21
7  28
Inherited from
_zip
Source
_zip.scala
inline def zipIndex(start: Int): Stream[(Int, A)]

Merge number Creates a new Stream with elements paired with their sequential position Note: Index is the first element in the resulting tuples.

Merge number Creates a new Stream with elements paired with their sequential position Note: Index is the first element in the resulting tuples.

   ('A' <> 'F').stream.zipIndex('A'.toInt) tp  // Prints Stream((65,A), (66,B), (67,C), (68,D), (69,E), (70,F))
Value Params
start

index initial value

Inherited from
_zip
Source
_zip.scala
inline def zipIndex: Stream[(Int, A)]

Merge index

Merge index

Creates a new Stream with elements paired with their sequential position, starting at 0

  ('A' <> 'F').stream.zipIndex.tp

  // Output

  Stream((0,A), (1,B), (2,C), (3,D), (4,E), (5,F))

Note: Index is the first element in the resulting tuples

Inherited from
_zip
Source
_zip.scala
inline def zipKey[B](f: A => B): Stream[(B, A)]

Merge property first

Merge property first

Creates a new Stream with elements paired with their property, defined by given function

The paired value is in the first tuple position

  ('A' <> 'F').stream.zipKey(_.toInt).tp  // Prints Stream((65,A), (66,B), (67,C), (68,D), (69,E), (70,F))
Inherited from
_zip
Source
_zip.scala
inline def zipNext: Stream[(A, Opt[A])]

Merge with next

Merge with next

Creates new Stream with elements paired with the optional next element

  (1 <> 5).stream.zipNext.tp  // Prints Stream((1,Opt(2)), (2,Opt(3)), (3,Opt(4)), (4,Opt(5)), (5,Opt(VOID)))
Inherited from
_zip
Source
_zip.scala
inline def zipPrior: Stream[(Opt[A], A)]

Merge with prior

Merge with prior

Creates new Stream with elements paired with the optional prior element

  (1 <> 5).stream.zipPrior.tp  // Prints Stream((Opt(VOID),1), (Opt(1),2), (Opt(2),3), (Opt(3),4), (Opt(4),5))
Inherited from
_zip
Source
_zip.scala
inline def zipValue[B](f: A => B): Stream[(A, B)]

Merge property

Merge property

Creates a new Stream with elements paired with their property, defined by given function

The paired value is in the second tuple position

  ('A' <> 'F').stream.zipValue(_.toInt).tp  // Prints Stream((A,65), (B,66), (C,67), (D,68), (E,69), (F,70))
Inherited from
_zip
Source
_zip.scala