1
1
/**
2
- * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3
- * Copyright 2017-2018 Alexis Seigneurin.
4
- */
5
-
2
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3
+ * Copyright 2017-2018 Alexis Seigneurin.
4
+ */
6
5
package com .lightbend .kafka .scala .streams
7
6
8
7
import org .apache .kafka .streams .kstream ._
@@ -12,10 +11,9 @@ import org.apache.kafka.common.serialization.Serde
12
11
import ImplicitConversions ._
13
12
import FunctionConversions ._
14
13
15
-
16
14
/**
17
- * Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
18
- */
15
+ * Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
16
+ */
19
17
class KGroupedStreamS [K , V ](inner : KGroupedStream [K , V ]) {
20
18
21
19
def count (): KTableS [K , Long ] = {
@@ -24,47 +22,41 @@ class KGroupedStreamS[K, V](inner: KGroupedStream[K, V]) {
24
22
}
25
23
26
24
def count (store : String , keySerde : Option [Serde [K ]] = None ): KTableS [K , Long ] = {
27
- val materialized = keySerde.foldLeft(Materialized .as[K , java.lang.Long , KeyValueStore [Bytes , Array [Byte ]]](store))((m,serde)=> m.withKeySerde(serde))
25
+ val materialized = keySerde.foldLeft(Materialized .as[K , java.lang.Long , KeyValueStore [Bytes , Array [Byte ]]](store))(
26
+ (m, serde) => m.withKeySerde(serde)
27
+ )
28
28
29
29
val c : KTableS [K , java.lang.Long ] = inner.count(materialized)
30
30
c.mapValues[Long ](Long2long _)
31
31
}
32
32
33
- def reduce (reducer : (V , V ) => V ): KTableS [K , V ] = {
33
+ def reduce (reducer : (V , V ) => V ): KTableS [K , V ] =
34
34
inner.reduce((v1, v2) => reducer(v1, v2))
35
- }
36
-
37
- def reduce (reducer : (V , V ) => V ,
38
- materialized : Materialized [K , V , KeyValueStore [Bytes , Array [Byte ]]]): KTableS [K , V ] = {
39
35
36
+ def reduce (reducer : (V , V ) => V , materialized : Materialized [K , V , KeyValueStore [Bytes , Array [Byte ]]]): KTableS [K , V ] =
40
37
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
41
38
// works perfectly with Scala 2.12 though
42
39
inner.reduce(((v1 : V , v2 : V ) => reducer(v1, v2)).asReducer, materialized)
43
- }
44
-
45
- def reduce (reducer : (V , V ) => V ,
46
- storeName : String )(implicit keySerde : Serde [K ], valueSerde : Serde [V ]): KTableS [K , V ] = {
47
40
41
+ def reduce (reducer : (V , V ) => V , storeName : String )(implicit keySerde : Serde [K ],
42
+ valueSerde : Serde [V ]): KTableS [K , V ] =
48
43
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
49
44
// works perfectly with Scala 2.12 though
50
- inner.reduce(((v1 : V , v2 : V ) =>
51
- reducer(v1, v2)).asReducer,
52
- Materialized .as[K , V , KeyValueStore [Bytes , Array [Byte ]]](storeName)
45
+ inner.reduce(
46
+ ((v1 : V , v2 : V ) => reducer(v1, v2)).asReducer,
47
+ Materialized
48
+ .as[K , V , KeyValueStore [Bytes , Array [Byte ]]](storeName)
53
49
.withKeySerde(keySerde)
54
50
.withValueSerde(valueSerde)
55
51
)
56
- }
57
52
58
- def aggregate [VR ](initializer : () => VR ,
59
- aggregator : (K , V , VR ) => VR ): KTableS [K , VR ] = {
53
+ def aggregate [VR ](initializer : () => VR , aggregator : (K , V , VR ) => VR ): KTableS [K , VR ] =
60
54
inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
61
- }
62
55
63
56
def aggregate [VR ](initializer : () => VR ,
64
- aggregator : (K , V , VR ) => VR ,
65
- materialized : Materialized [K , VR , KeyValueStore [Bytes , Array [Byte ]]]): KTableS [K , VR ] = {
57
+ aggregator : (K , V , VR ) => VR ,
58
+ materialized : Materialized [K , VR , KeyValueStore [Bytes , Array [Byte ]]]): KTableS [K , VR ] =
66
59
inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
67
- }
68
60
69
61
def windowedBy (windows : SessionWindows ): SessionWindowedKStreamS [K , V ] =
70
62
inner.windowedBy(windows)
0 commit comments