'

Scala Data Pipelines @ Spotify

Понравилась презентация – покажи это...





Слайд 0

Scala Data Pipelines @ Spotify Neville Li @sinisa_lyh


Слайд 1

Who am I? ‣ Spotify NYC since 2011 ‣ Formerly Yahoo! Search ‣ Music recommendations ‣ Data infrastructure ‣ Scala since 2013


Слайд 2

Spotify in numbers • • • • • • • Started in 2006, 58 markets 75M+ active users, 20M+ paying 30M+ songs, 20K new per day 1.5 billion playlists 1 TB logs per day 1200+ node Hadoop cluster 10K+ Hadoop jobs per day


Слайд 3

Music recommendation @ Spotify • • • • Discover Weekly Radio Related Artists Discover Page


Слайд 4

Recommendation systems


Слайд 5

A little teaser PGroupedTable<K,V>::combineValues(CombineFn<K,V> combineFn, CombineFn<K,V> reduceFn) Crunch: CombineFns are used to represent the associative operations… Grouped[K, +V]::reduce[U >: V](fn: (U, U) U) Scalding: reduce with fn which must be associative and commutative… PairRDDFunctions[K, V]::reduceByKey(fn: (V, V) => V) Spark: Merge the values for each key using an associative reduce function…


Слайд 6

Monoid! enables map side reduce Actually it’s a semigroup


Слайд 7

One more teaser Linear equation in Alternate Least Square (ALS) Matrix factorization xu = (YTY + YT(Cu − I)Y)−1YTCup(u) vectors.map { case (id, v) => (id, v * v) }.map(_._2).reduce(_ + _) // YtY ratings.keyBy(fixedKey).join(outerProducts) // YtCuIY .map { case (_, (r, op)) => (solveKey(r), op * (r.rating * alpha)) }.reduceByKey(_ + _) ratings.keyBy(fixedKey).join(vectors) // YtCupu .map { case (_, (r, v)) => val (Cui, pui) = (r.rating * alpha + 1, if (Cui > 0.0) 1.0 else 0.0) (solveKey(r), v * (Cui * pui)) }.reduceByKey(_ + _) http:/ /www.slideshare.net/MrChrisJohnson/scala-data-pipelines-for-music-recommendations


Слайд 8

Success story • • • • • • Mid 2013: 100+ Python Luigi M/R jobs, few tests 10+ new hires since, most fresh grads Few with Java experience, none with Scala Now: 300+ Scalding jobs, 400+ tests More ad-hoc jobs untracked Spark also taking off


Слайд 9

First 10 months ……


Слайд 10

Activity over time


Слайд 11

Guess how many jobs written by yours truly?


Слайд 12

Performance vs. Agility https://nicholassterling.wordpress.com/2012/11/16/scala-performance/


Слайд 13

Let’s dive into something technical


Слайд 14

To join or not to join? val streams: TypedPipe[(String, String)] = _ // (track, user) val tgp: TypedPipe[(String, String)] = _ // (track, genre) streams .join(tgp) .values // (user, genre) .group .mapValueStream(vs => Iterator(vs.toSet)) // reducer-only


Слайд 15

Hash join val streams: TypedPipe[(String, String)] = _ // (track, user) val tgp: TypedPipe[(String, String)] = _ // (track, genre) streams .hashJoin(tgp.forceToDisk) // tgp replicated to all mappers .values // (user, genre) .group .mapValueStream(vs => Iterator(vs.toSet)) // reducer-only


Слайд 16

CoGroup val streams: TypedPipe[(String, String)] = _ val tgp: TypedPipe[(String, String)] = _ streams .cogroup(tgp) { case (_, users, genres) => users.map((_, genres.toSet)) } // (track, (user, genres)) .values // (user, genres)
 .group .reduce(_ ++ _) // map-side reduce! // (track, user) // (track, genre)


Слайд 17

CoGroup val streams: TypedPipe[(String, String)] = _ val tgp: TypedPipe[(String, String)] = _ streams .cogroup(tgp) { case (_, users, genres) => users.map((_, genres.toSet)) } // (track, (user, genres)) .values // (user, genres)
 .group .sum // SetMonoid[Set[T]] from Algebird * sum[U >: V](implicit sg: Semigroup[U]) // (track, user) // (track, genre)


Слайд 18

Key-value file as distributed cache val streams: TypedPipe[(String, String)] = _ // (gid, user) val tgp: SparkeyManager = _ // tgp replicated to all mappers streams .map { case (track, user) => (user, tgp.get(track).split(",").toSet) } .group .sum https:/ /github.com/spotify/sparkey SparkeyManager wraps DistributedCacheFile


Слайд 19

Joins and CoGroups • • • • Require shuffle and reduce step Some ops force everything to reducers
 e.g. mapGroup, mapValueStream CoGroup more flexible for complex logic Scalding flattens a.join(b).join(c)…
 into MultiJoin(a, b, c, …)


Слайд 20

Distributed cache • • • • • • • Faster with off-heap binary files Building cache = more wiring Memory mapping may interfere with YARN E.g. 64GB nodes with 48GB for containers (no cgroup) 12 × 2GB containers each with 2GB JVM heap + mmap cache OOM and swap! Keep files small (< 1GB) or fallback to joins…


Слайд 21

Analyze your jobs • • • • • Concurrent Driven Visualize job execution Workflow optimization Bottlenecks Data skew


Слайд 22

Not enough math?


Слайд 23

Recommending tracks • • • • • • User listened to Rammstein - Du Hast Recommend 10 similar tracks 40 dimension feature vectors for tracks Compute cosine similarity between all pairs O(n) lookup per user where n ≈ 30m Try that with 50m users * 10 seed tracks each


Слайд 24

ANNOY - cheat by approximation • • • • • Approximate Nearest Neighbor Oh Yeah Random projections and binary tree search Build index on single machine Load in mappers via distribute cache O(log n) lookup https:/ /github.com/spotify/annoy https:/ /github.com/spotify/annoy-java


Слайд 25

ANN Benchmark https://github.com/erikbern/ann-benchmarks


Слайд 26

Filtering candidates • • • • • • Users don’t like seeing artist/album/tracks they already know But may forget what they listened long ago 50m * thousands of items each Over 5 years of streaming logs Need to update daily Need to purge old items per user


Слайд 27

Options • • • • Aggregate all logs daily Aggregate last x days daily CSV of artist/album/track ids Bloom filters


Слайд 28

Decayed value with cutoff • • • • • • Compute new user-item score daily Weighted on context, e.g. radio, search, playlist score’ = score + previous * 0.99 half life = log0.990.5 = 69 days Cut off at top 2000 Items that users might remember seeing recently


Слайд 29

Bloom filters • • • • • • • Probabilistic data structure Encoding set of items with m bits and k hash functions No false negative Tunable false positive probability Size proportional to capacity & FP probability Let’s build one per user-{artists,albums,tracks} Algebird BloomFilterMonoid: z = all zero bits, + = bitwise OR


Слайд 30

Size versus max items & FP prob • • • • User-item distribution is uneven Assuming same setting for all users # items << capacity → wasting space # items > capacity → high FP rate


Слайд 31

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead


Слайд 32

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead item n=1k


Слайд 33

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead item full n=1k n=10k


Слайд 34

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead item full full n=1k n=10k n=100k


Слайд 35

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead item full full n=1k full n=10k n=100k n=1m


Слайд 36

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup


Слайд 37

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup item n=1k


Слайд 38

   80% n=10k


Слайд 39

   8% n=100k


Слайд 40

   0.8% n=1m


Слайд 41

   0.08%


Слайд 42

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup item full n=1k


Слайд 43

   100% n=10k


Слайд 44

   70% n=100k


Слайд 45

   7% n=1m


Слайд 46

   0.7%


Слайд 47

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup item full n=1k


Слайд 48

   100% full n=10k


Слайд 49

   100% n=100k


Слайд 50

   60% n=1m


Слайд 51

   6%


Слайд 52

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup item full n=1k


Слайд 53

   100% full n=10k


Слайд 54

   100% keep n=100k


Слайд 55

   60% under-


Слайд 56

   utilized n=1m


Слайд 57

   6%


Слайд 58

Want more scala.language .experimental?


Слайд 59

Track metadata • Label dump → content ingestion Third party track genres, e.g. GraceNote Audio attributes, e.g. tempo, key, time signature Cultural data, e.g. popularity, tags Latent vectors from collaborative filtering • Many sources for album, artist, user metadata too • • • •


Слайд 60

Multiple data sources • • • • Big joins Complex dependencies Wide rows with few columns accessed Wasting I/O


Слайд 61

Apache Parquet • • • • • Pre-join sources into mega-datasets Store as Parquet columnar storage Column projection Predicate pushdown Avro within Scalding pipelines


Слайд 62

Projection • • • • pipe.map(a => (a.getName, a.getAmount)) versus Parquet.project[Account]("name", "amount") Strings → unsafe and error prone No IDE auto-completion → finger injury my_fancy_field_name → .getMyFancyFieldName Hard to migrate existing code


Слайд 63

Predicate pipe.filter(a => a.getName == "Neville" && a.getAmount > 100) versus FilterApi.and( FilterApi.eq(FilterApi.binaryColumn("name"), Binary.fromString("Neville")), FilterApi.gt(FilterApi.floatColumn("amount"), 100f.asInstnacesOf[java.lang.Float]))


Слайд 64

Macro to the rescue Code → AST → (pattern matching) → (recursion) → (quasi-quotes) → Code Projection[Account](_.getName, _.getAmount) Predicate[Account](x => x.getName == “Neville" && x.getAmount > 100) https:/ /github.com/nevillelyh/parquet-avro-extra http:/ /www.lyh.me/slides/macros.html


Слайд 65

What else? ‣ Analytics ‣ Ads targeting, prediction ‣ Metadata quality ‣ Zeppelin ‣ More cool stuff in the works


Слайд 66

And we’re hiring


Слайд 67

The End Thank You


Слайд 68


×

HTML:





Ссылка: