Discussion:
spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards
Nikaash Puri
2016-04-27 13:14:56 UTC
Permalink
Hi,

I’ve been working with LLR in Mahout for a while now. Mostly using the SimilarityAnalysis.cooccurenceIDss function. I recently upgraded the Mahout libraries to 0.11, and subsequently also tried with 0.12 and the same program is running orders of magnitude slower (at least 3x based on initial analysis).

Looking into the tasks more carefully, comparing 0.10 and 0.11 shows that the amount of Shuffle being done in 0.11 is significantly higher, especially in the AtB step. This could possibly be a reason for the reduction in performance.

Although, I am working on Spark 1.2.0. So, its possible that this could be causing the problem. It works fine with Mahout 0.10.

Any ideas why this might be happening?

Thank you,
Nikaash Puri
Dmitriy Lyubimov
2016-04-27 15:37:14 UTC
Permalink
0.11 targets 1.3+.

I don't quite have anything on top of my head affecting A'B specifically,
but i think there were some chanages affecting in-memory multiplication
(which is of course used in distributed A'B).

I am not in particular familiar or remember details of row similarity on
top of my head, i really wish the original contributor would comment on
that. trying to see if i can come up with anything useful though.

what behavior do you see in this job -- cpu-bound or i/o bound?

there are a few pointers to look at:

(1) I/O many times exceeds the input size, so spills are inevitable. So
tuning memory sizes and look at spark spill locations to make sure disks
are not slow there is critical. Also, i think in spark 1.6 spark added a
lot of flexibility in managing task/cache/shuffle memory sizes, it may help
in some unexpected way.

(2) sufficient cache: many pipelines commit reused matrices into cache
(MEMORY_ONLY) which is the default mahout algebra behavior, assuming there
is enough cache memory there for only good things to happen. if it is not,
however, it will cause recomputation of results that were evicted. (not
saying it is a known case for row similarity in particular). make sure this
is not the case. For cases of scatter type exchanges it is especially super
bad.

(3) A'B -- try to hack and play with implemetnation there in AtB (spark
side) class. See if you can come up with a better arrangement.

(4) in-memory computations (MMul class) if that's the bottleneck can be in
practice quick-hacked with mutlithreaded multiplication and bridge to
native solvers (netlib-java) at least for dense cases. this is found to
improve performance of distributed multiplications a bit. Works best if you
get 2 threads in the backend and all threads in the front end.

There are other known things that can improve speed multiplication of the
public mahout version, i hope mahout will improve on those in the future.

-d
Hi,
I’ve been working with LLR in Mahout for a while now. Mostly using the
SimilarityAnalysis.cooccurenceIDss function. I recently upgraded the Mahout
libraries to 0.11, and subsequently also tried with 0.12 and the same
program is running orders of magnitude slower (at least 3x based on initial
analysis).
Looking into the tasks more carefully, comparing 0.10 and 0.11 shows that
the amount of Shuffle being done in 0.11 is significantly higher,
especially in the AtB step. This could possibly be a reason for the
reduction in performance.
Although, I am working on Spark 1.2.0. So, its possible that this could be
causing the problem. It works fine with Mahout 0.10.
Any ideas why this might be happening?
Thank you,
Nikaash Puri
Pat Ferrel
2016-04-27 21:20:17 UTC
Permalink
I have been using the same function through all those versions of Mahout. I’m running on newer versions of Spark 1.4-1.6.2. Using my datasets there has been no slowdown. I assume that you are only changing the Mahout version—leaving data, Spark, HDFS, and all config the same. In which case I wonder if you are somehow running into limits of your machine like memory? Have you allocated a fixed executor memory limit?

There has been almost no code change to item similarity. Dmitriy, do you know if the underlying AtB has changed? I seem to recall the partitioning was set to “auto” about 0.11. We were having problems with large numbers of small part files from Spark Streaming causing partitioning headaches as I recall. In some unexpected way the input structure was trickling down into partitioning decisions made in Spark.

The first thing I’d try is giving the job more executor memory, the second is to upgrade Spark. A 3x increase in execution speed is a pretty big deal if it isn’t helped with these easy fixes so can you share your data?

On Apr 27, 2016, at 8:37 AM, Dmitriy Lyubimov <***@gmail.com> wrote:

0.11 targets 1.3+.

I don't quite have anything on top of my head affecting A'B specifically,
but i think there were some chanages affecting in-memory multiplication
(which is of course used in distributed A'B).

I am not in particular familiar or remember details of row similarity on
top of my head, i really wish the original contributor would comment on
that. trying to see if i can come up with anything useful though.

what behavior do you see in this job -- cpu-bound or i/o bound?

there are a few pointers to look at:

(1) I/O many times exceeds the input size, so spills are inevitable. So
tuning memory sizes and look at spark spill locations to make sure disks
are not slow there is critical. Also, i think in spark 1.6 spark added a
lot of flexibility in managing task/cache/shuffle memory sizes, it may help
in some unexpected way.

(2) sufficient cache: many pipelines commit reused matrices into cache
(MEMORY_ONLY) which is the default mahout algebra behavior, assuming there
is enough cache memory there for only good things to happen. if it is not,
however, it will cause recomputation of results that were evicted. (not
saying it is a known case for row similarity in particular). make sure this
is not the case. For cases of scatter type exchanges it is especially super
bad.

(3) A'B -- try to hack and play with implemetnation there in AtB (spark
side) class. See if you can come up with a better arrangement.

(4) in-memory computations (MMul class) if that's the bottleneck can be in
practice quick-hacked with mutlithreaded multiplication and bridge to
native solvers (netlib-java) at least for dense cases. this is found to
improve performance of distributed multiplications a bit. Works best if you
get 2 threads in the backend and all threads in the front end.

There are other known things that can improve speed multiplication of the
public mahout version, i hope mahout will improve on those in the future.

-d
Post by Nikaash Puri
Hi,
I’ve been working with LLR in Mahout for a while now. Mostly using the
SimilarityAnalysis.cooccurenceIDss function. I recently upgraded the Mahout
libraries to 0.11, and subsequently also tried with 0.12 and the same
program is running orders of magnitude slower (at least 3x based on initial
analysis).
Looking into the tasks more carefully, comparing 0.10 and 0.11 shows that
the amount of Shuffle being done in 0.11 is significantly higher,
especially in the AtB step. This could possibly be a reason for the
reduction in performance.
Although, I am working on Spark 1.2.0. So, its possible that this could be
causing the problem. It works fine with Mahout 0.10.
Any ideas why this might be happening?
Thank you,
Nikaash Puri
Nikaash Puri
2016-04-28 03:31:49 UTC
Permalink
Hi Pat, Dmitriy

Thanks so much. Will run some more experiments to validate the initial
outcomes. A Spark upgrade is definitely in the pipeline and will likely
solve some of these performance issues.

Pat, yup, the tests were conducted under identical code bases and data sets
other than the Mahout version change. I'm sorry, the data is sensitive so
sharing it won't be possible.

Also, as far as I can tell, spark-itemsimilarity now uses computeAtBzipped3
instead of computeAtBzipped for AtB. Although, this is meant to speed
things up, so not sure its relevant as far as this problem is concerned.

Thank you,
Nikaash Puri
Post by Pat Ferrel
I have been using the same function through all those versions of Mahout.
I’m running on newer versions of Spark 1.4-1.6.2. Using my datasets there
has been no slowdown. I assume that you are only changing the Mahout
version—leaving data, Spark, HDFS, and all config the same. In which case I
wonder if you are somehow running into limits of your machine like memory?
Have you allocated a fixed executor memory limit?
There has been almost no code change to item similarity. Dmitriy, do you
know if the underlying AtB has changed? I seem to recall the partitioning
was set to “auto” about 0.11. We were having problems with large numbers of
small part files from Spark Streaming causing partitioning headaches as I
recall. In some unexpected way the input structure was trickling down into
partitioning decisions made in Spark.
The first thing I’d try is giving the job more executor memory, the second
is to upgrade Spark. A 3x increase in execution speed is a pretty big deal
if it isn’t helped with these easy fixes so can you share your data?
0.11 targets 1.3+.
I don't quite have anything on top of my head affecting A'B specifically,
but i think there were some chanages affecting in-memory multiplication
(which is of course used in distributed A'B).
I am not in particular familiar or remember details of row similarity on
top of my head, i really wish the original contributor would comment on
that. trying to see if i can come up with anything useful though.
what behavior do you see in this job -- cpu-bound or i/o bound?
(1) I/O many times exceeds the input size, so spills are inevitable. So
tuning memory sizes and look at spark spill locations to make sure disks
are not slow there is critical. Also, i think in spark 1.6 spark added a
lot of flexibility in managing task/cache/shuffle memory sizes, it may help
in some unexpected way.
(2) sufficient cache: many pipelines commit reused matrices into cache
(MEMORY_ONLY) which is the default mahout algebra behavior, assuming there
is enough cache memory there for only good things to happen. if it is not,
however, it will cause recomputation of results that were evicted. (not
saying it is a known case for row similarity in particular). make sure this
is not the case. For cases of scatter type exchanges it is especially super
bad.
(3) A'B -- try to hack and play with implemetnation there in AtB (spark
side) class. See if you can come up with a better arrangement.
(4) in-memory computations (MMul class) if that's the bottleneck can be in
practice quick-hacked with mutlithreaded multiplication and bridge to
native solvers (netlib-java) at least for dense cases. this is found to
improve performance of distributed multiplications a bit. Works best if you
get 2 threads in the backend and all threads in the front end.
There are other known things that can improve speed multiplication of the
public mahout version, i hope mahout will improve on those in the future.
-d
Hi,
I’ve been working with LLR in Mahout for a while now. Mostly using the
SimilarityAnalysis.cooccurenceIDss function. I recently upgraded the
Mahout
libraries to 0.11, and subsequently also tried with 0.12 and the same
program is running orders of magnitude slower (at least 3x based on
initial
analysis).
Looking into the tasks more carefully, comparing 0.10 and 0.11 shows that
the amount of Shuffle being done in 0.11 is significantly higher,
especially in the AtB step. This could possibly be a reason for the
reduction in performance.
Although, I am working on Spark 1.2.0. So, its possible that this could
be
causing the problem. It works fine with Mahout 0.10.
Any ideas why this might be happening?
Thank you,
Nikaash Puri
Nikaash Puri
2016-04-28 13:02:32 UTC
Permalink
Hi,

Ok, so interestingly enough when I repartition my input data across indicators on the User IDs, I get significant speedup. This is probably because shuffle goes down since RDDs with the same user ids are more likely located on the same nodes. What’s even more interesting is the behaviour as a function of the number of partitions.

Concretely, in my case I was using around 20 cores. So, setting the number of partitions as 200 or more leads to greater shuffle and poorer performance. Setting the number of partitions to slightly more than the number of cores, 30 in my case gives significant speedups in the AtB calculations. Again, my guess is that shuffle is the reason.

I’ll keep experimenting and share more results.

All of these tests are with Spark 1.2.0 and Mahout 0.10.

Thank you,
Nikaash Puri
Post by Pat Ferrel
I have been using the same function through all those versions of Mahout. I’m running on newer versions of Spark 1.4-1.6.2. Using my datasets there has been no slowdown. I assume that you are only changing the Mahout version—leaving data, Spark, HDFS, and all config the same. In which case I wonder if you are somehow running into limits of your machine like memory? Have you allocated a fixed executor memory limit?
There has been almost no code change to item similarity. Dmitriy, do you know if the underlying AtB has changed? I seem to recall the partitioning was set to “auto” about 0.11. We were having problems with large numbers of small part files from Spark Streaming causing partitioning headaches as I recall. In some unexpected way the input structure was trickling down into partitioning decisions made in Spark.
The first thing I’d try is giving the job more executor memory, the second is to upgrade Spark. A 3x increase in execution speed is a pretty big deal if it isn’t helped with these easy fixes so can you share your data?
0.11 targets 1.3+.
I don't quite have anything on top of my head affecting A'B specifically,
but i think there were some chanages affecting in-memory multiplication
(which is of course used in distributed A'B).
I am not in particular familiar or remember details of row similarity on
top of my head, i really wish the original contributor would comment on
that. trying to see if i can come up with anything useful though.
what behavior do you see in this job -- cpu-bound or i/o bound?
(1) I/O many times exceeds the input size, so spills are inevitable. So
tuning memory sizes and look at spark spill locations to make sure disks
are not slow there is critical. Also, i think in spark 1.6 spark added a
lot of flexibility in managing task/cache/shuffle memory sizes, it may help
in some unexpected way.
(2) sufficient cache: many pipelines commit reused matrices into cache
(MEMORY_ONLY) which is the default mahout algebra behavior, assuming there
is enough cache memory there for only good things to happen. if it is not,
however, it will cause recomputation of results that were evicted. (not
saying it is a known case for row similarity in particular). make sure this
is not the case. For cases of scatter type exchanges it is especially super
bad.
(3) A'B -- try to hack and play with implemetnation there in AtB (spark
side) class. See if you can come up with a better arrangement.
(4) in-memory computations (MMul class) if that's the bottleneck can be in
practice quick-hacked with mutlithreaded multiplication and bridge to
native solvers (netlib-java) at least for dense cases. this is found to
improve performance of distributed multiplications a bit. Works best if you
get 2 threads in the backend and all threads in the front end.
There are other known things that can improve speed multiplication of the
public mahout version, i hope mahout will improve on those in the future.
-d
Post by Nikaash Puri
Hi,
I’ve been working with LLR in Mahout for a while now. Mostly using the
SimilarityAnalysis.cooccurenceIDss function. I recently upgraded the Mahout
libraries to 0.11, and subsequently also tried with 0.12 and the same
program is running orders of magnitude slower (at least 3x based on initial
analysis).
Looking into the tasks more carefully, comparing 0.10 and 0.11 shows that
the amount of Shuffle being done in 0.11 is significantly higher,
especially in the AtB step. This could possibly be a reason for the
reduction in performance.
Although, I am working on Spark 1.2.0. So, its possible that this could be
causing the problem. It works fine with Mahout 0.10.
Any ideas why this might be happening?
Thank you,
Nikaash Puri
Dmitriy Lyubimov
2016-04-28 17:12:28 UTC
Permalink
Yes.

Parallelism in Spark makes all the difference.

Since scatter type exchnange in spark increases I/O with increase of # of
the splits, strong scalling is not achievable. if you just keep increasing
parallelism, there's a point where individual cpu load decreases but
cumulative IO cancels out any gains of the parallelism increase. So it is
important to carefully pre-split algorithms inputs using par() operator.

But assuming the same parallelization strategy before and after, release
change also probably should not affect that

-d
Post by Nikaash Puri
Hi,
Ok, so interestingly enough when I repartition my input data across
indicators on the User IDs, I get significant speedup. This is probably
because shuffle goes down since RDDs with the same user ids are more likely
located on the same nodes. What’s even more interesting is the behaviour as
a function of the number of partitions.
Concretely, in my case I was using around 20 cores. So, setting the number
of partitions as 200 or more leads to greater shuffle and poorer
performance. Setting the number of partitions to slightly more than the
number of cores, 30 in my case gives significant speedups in the AtB
calculations. Again, my guess is that shuffle is the reason.
I’ll keep experimenting and share more results.
All of these tests are with Spark 1.2.0 and Mahout 0.10.
Thank you,
Nikaash Puri
Post by Pat Ferrel
I have been using the same function through all those versions of
Mahout. I’m running on newer versions of Spark 1.4-1.6.2. Using my datasets
there has been no slowdown. I assume that you are only changing the Mahout
version—leaving data, Spark, HDFS, and all config the same. In which case I
wonder if you are somehow running into limits of your machine like memory?
Have you allocated a fixed executor memory limit?
Post by Pat Ferrel
There has been almost no code change to item similarity. Dmitriy, do you
know if the underlying AtB has changed? I seem to recall the partitioning
was set to “auto” about 0.11. We were having problems with large numbers of
small part files from Spark Streaming causing partitioning headaches as I
recall. In some unexpected way the input structure was trickling down into
partitioning decisions made in Spark.
Post by Pat Ferrel
The first thing I’d try is giving the job more executor memory, the
second is to upgrade Spark. A 3x increase in execution speed is a pretty
big deal if it isn’t helped with these easy fixes so can you share your
data?
Post by Pat Ferrel
0.11 targets 1.3+.
I don't quite have anything on top of my head affecting A'B specifically,
but i think there were some chanages affecting in-memory multiplication
(which is of course used in distributed A'B).
I am not in particular familiar or remember details of row similarity on
top of my head, i really wish the original contributor would comment on
that. trying to see if i can come up with anything useful though.
what behavior do you see in this job -- cpu-bound or i/o bound?
(1) I/O many times exceeds the input size, so spills are inevitable. So
tuning memory sizes and look at spark spill locations to make sure disks
are not slow there is critical. Also, i think in spark 1.6 spark added a
lot of flexibility in managing task/cache/shuffle memory sizes, it may
help
Post by Pat Ferrel
in some unexpected way.
(2) sufficient cache: many pipelines commit reused matrices into cache
(MEMORY_ONLY) which is the default mahout algebra behavior, assuming
there
Post by Pat Ferrel
is enough cache memory there for only good things to happen. if it is
not,
Post by Pat Ferrel
however, it will cause recomputation of results that were evicted. (not
saying it is a known case for row similarity in particular). make sure
this
Post by Pat Ferrel
is not the case. For cases of scatter type exchanges it is especially
super
Post by Pat Ferrel
bad.
(3) A'B -- try to hack and play with implemetnation there in AtB (spark
side) class. See if you can come up with a better arrangement.
(4) in-memory computations (MMul class) if that's the bottleneck can be
in
Post by Pat Ferrel
practice quick-hacked with mutlithreaded multiplication and bridge to
native solvers (netlib-java) at least for dense cases. this is found to
improve performance of distributed multiplications a bit. Works best if
you
Post by Pat Ferrel
get 2 threads in the backend and all threads in the front end.
There are other known things that can improve speed multiplication of the
public mahout version, i hope mahout will improve on those in the future.
-d
Hi,
I’ve been working with LLR in Mahout for a while now. Mostly using the
SimilarityAnalysis.cooccurenceIDss function. I recently upgraded the
Mahout
Post by Pat Ferrel
libraries to 0.11, and subsequently also tried with 0.12 and the same
program is running orders of magnitude slower (at least 3x based on
initial
Post by Pat Ferrel
analysis).
Looking into the tasks more carefully, comparing 0.10 and 0.11 shows
that
Post by Pat Ferrel
the amount of Shuffle being done in 0.11 is significantly higher,
especially in the AtB step. This could possibly be a reason for the
reduction in performance.
Although, I am working on Spark 1.2.0. So, its possible that this could
be
Post by Pat Ferrel
causing the problem. It works fine with Mahout 0.10.
Any ideas why this might be happening?
Thank you,
Nikaash Puri
Pat Ferrel
2016-04-28 19:12:04 UTC
Permalink
Hmm, can’t get images through the Apache mail servers.

The image is here: https://drive.google.com/file/d/0B4cAk1SMC1ChWFZiRG9DSEpkdzg/view?usp=sharing


On Apr 28, 2016, at 11:55 AM, Pat Ferrel <***@occamsmachete.com> wrote:

Actually on your advice Dmitriy I think these changes went in about 11. Before 11 par was not called. Any clue here?

This was in relation to that issue when reading a huge number of part files created by Spark Streaming, which probably trickled down to cause too much parallelization. The auto=true fixed this issue for me but did it have other effects?


<PastedGraphic-3.tiff>


On Apr 28, 2016, at 10:12 AM, Dmitriy Lyubimov <***@gmail.com <mailto:***@gmail.com>> wrote:

Yes.

Parallelism in Spark makes all the difference.

Since scatter type exchnange in spark increases I/O with increase of # of
the splits, strong scalling is not achievable. if you just keep increasing
parallelism, there's a point where individual cpu load decreases but
cumulative IO cancels out any gains of the parallelism increase. So it is
important to carefully pre-split algorithms inputs using par() operator.

But assuming the same parallelization strategy before and after, release
change also probably should not affect that

-d
Post by Nikaash Puri
Hi,
Ok, so interestingly enough when I repartition my input data across
indicators on the User IDs, I get significant speedup. This is probably
because shuffle goes down since RDDs with the same user ids are more likely
located on the same nodes. What’s even more interesting is the behaviour as
a function of the number of partitions.
Concretely, in my case I was using around 20 cores. So, setting the number
of partitions as 200 or more leads to greater shuffle and poorer
performance. Setting the number of partitions to slightly more than the
number of cores, 30 in my case gives significant speedups in the AtB
calculations. Again, my guess is that shuffle is the reason.
I’ll keep experimenting and share more results.
All of these tests are with Spark 1.2.0 and Mahout 0.10.
Thank you,
Nikaash Puri
Post by Pat Ferrel
I have been using the same function through all those versions of
Mahout. I’m running on newer versions of Spark 1.4-1.6.2. Using my datasets
there has been no slowdown. I assume that you are only changing the Mahout
version—leaving data, Spark, HDFS, and all config the same. In which case I
wonder if you are somehow running into limits of your machine like memory?
Have you allocated a fixed executor memory limit?
Post by Pat Ferrel
There has been almost no code change to item similarity. Dmitriy, do you
know if the underlying AtB has changed? I seem to recall the partitioning
was set to “auto” about 0.11. We were having problems with large numbers of
small part files from Spark Streaming causing partitioning headaches as I
recall. In some unexpected way the input structure was trickling down into
partitioning decisions made in Spark.
Post by Pat Ferrel
The first thing I’d try is giving the job more executor memory, the
second is to upgrade Spark. A 3x increase in execution speed is a pretty
big deal if it isn’t helped with these easy fixes so can you share your
data?
Post by Pat Ferrel
0.11 targets 1.3+.
I don't quite have anything on top of my head affecting A'B specifically,
but i think there were some chanages affecting in-memory multiplication
(which is of course used in distributed A'B).
I am not in particular familiar or remember details of row similarity on
top of my head, i really wish the original contributor would comment on
that. trying to see if i can come up with anything useful though.
what behavior do you see in this job -- cpu-bound or i/o bound?
(1) I/O many times exceeds the input size, so spills are inevitable. So
tuning memory sizes and look at spark spill locations to make sure disks
are not slow there is critical. Also, i think in spark 1.6 spark added a
lot of flexibility in managing task/cache/shuffle memory sizes, it may
help
Post by Pat Ferrel
in some unexpected way.
(2) sufficient cache: many pipelines commit reused matrices into cache
(MEMORY_ONLY) which is the default mahout algebra behavior, assuming
there
Post by Pat Ferrel
is enough cache memory there for only good things to happen. if it is
not,
Post by Pat Ferrel
however, it will cause recomputation of results that were evicted. (not
saying it is a known case for row similarity in particular). make sure
this
Post by Pat Ferrel
is not the case. For cases of scatter type exchanges it is especially
super
Post by Pat Ferrel
bad.
(3) A'B -- try to hack and play with implemetnation there in AtB (spark
side) class. See if you can come up with a better arrangement.
(4) in-memory computations (MMul class) if that's the bottleneck can be
in
Post by Pat Ferrel
practice quick-hacked with mutlithreaded multiplication and bridge to
native solvers (netlib-java) at least for dense cases. this is found to
improve performance of distributed multiplications a bit. Works best if
you
Post by Pat Ferrel
get 2 threads in the backend and all threads in the front end.
There are other known things that can improve speed multiplication of the
public mahout version, i hope mahout will improve on those in the future.
-d
Hi,
I’ve been working with LLR in Mahout for a while now. Mostly using the
SimilarityAnalysis.cooccurenceIDss function. I recently upgraded the
Mahout
Post by Pat Ferrel
libraries to 0.11, and subsequently also tried with 0.12 and the same
program is running orders of magnitude slower (at least 3x based on
initial
Post by Pat Ferrel
analysis).
Looking into the tasks more carefully, comparing 0.10 and 0.11 shows
that
Post by Pat Ferrel
the amount of Shuffle being done in 0.11 is significantly higher,
especially in the AtB step. This could possibly be a reason for the
reduction in performance.
Although, I am working on Spark 1.2.0. So, its possible that this could
be
Post by Pat Ferrel
causing the problem. It works fine with Mahout 0.10.
Any ideas why this might be happening?
Thank you,
Nikaash Puri
Dmitriy Lyubimov
2016-04-28 19:20:29 UTC
Permalink
(sorry for repetition, the list rejects my previous replies due to quoted
message size)

"Auto" just reclusters the input per given _configured cluster capacity_
(there's some safe guard there though i think that doesn't blow up # of
splits if the initial number of splits is ridiculously small though, e.g.
not to recluster 2-split problem into a 300-split problem).

For some algorithms, this is appropriate.

For others such as mmul-bound (A'B) problems, there's a "sweet spot" that i
mentioned due to I/O bandwidth being function of the parallelism -- which
technically doesn't have anything to do with available cluster capacity. It
is possible that if you do A.par(auto=true).t %*% B.par(auto=true) then you
get a worse performance with 500-task cluster than on 60-task cluster
(depending on the size of operands and product).
Post by Pat Ferrel
Actually on your advice Dmitriy I think these changes went in about 11.
Before 11 par was not called. Any clue here?
This was in relation to that issue when reading a huge number of part
files created by Spark Streaming, which probably trickled down to cause too
much parallelization. The auto=true fixed this issue for me but did it have
other effects?
Dmitriy Lyubimov
2016-04-29 15:53:28 UTC
Permalink
can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?

the row similarity is a very short pipeline (in comparison with what would
normally be on average). so only the first input re-splitting is critical.

The splitting along the products is adjusted by optimizer automatically to
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B has
5000 elements per split then C would approximately have 5000 elements per
split (the larger average in binary operator cases). That's approximately
how it works.

However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now doubt it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for multioplication
unfortunately depends on tons of factors -- network bandwidth and hardware
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code) the
initial paralellization adjustment options should probably be provided to
CLI.
Khurrum Nasim
2016-04-29 16:05:25 UTC
Permalink
Is that for me Dimitry ?
Post by Dmitriy Lyubimov
can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?
the row similarity is a very short pipeline (in comparison with what would
normally be on average). so only the first input re-splitting is critical.
The splitting along the products is adjusted by optimizer automatically to
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B has
5000 elements per split then C would approximately have 5000 elements per
split (the larger average in binary operator cases). That's approximately
how it works.
However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now doubt it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for multioplication
unfortunately depends on tons of factors -- network bandwidth and hardware
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code) the
initial paralellization adjustment options should probably be provided to
CLI.
Dmitriy Lyubimov
2016-04-29 16:11:17 UTC
Permalink
I was replying to Nikaash.

Sorry -- list keeps rejecting replies because of the size, i had to remove
the content
Post by Khurrum Nasim
Is that for me Dimitry ?
Post by Dmitriy Lyubimov
can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?
the row similarity is a very short pipeline (in comparison with what
would
Post by Dmitriy Lyubimov
normally be on average). so only the first input re-splitting is
critical.
Post by Dmitriy Lyubimov
The splitting along the products is adjusted by optimizer automatically
to
Post by Dmitriy Lyubimov
match the amount of data segments observed on average in the input(s).
e.g.
Post by Dmitriy Lyubimov
if uyou compute val C = A %*% B and A has 500 elements per split and B
has
Post by Dmitriy Lyubimov
5000 elements per split then C would approximately have 5000 elements per
split (the larger average in binary operator cases). That's
approximately
Post by Dmitriy Lyubimov
how it works.
However, the par() that has been added, is messing with initial
parallelism
Post by Dmitriy Lyubimov
which would naturally affect the rest of pipeline per above. I now doubt
it
Post by Dmitriy Lyubimov
was a good thing -- when i suggested Pat to try this, i did not mean to
put
Post by Dmitriy Lyubimov
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for
multioplication
Post by Dmitriy Lyubimov
unfortunately depends on tons of factors -- network bandwidth and
hardware
Post by Dmitriy Lyubimov
configuration, so it is difficult to give it a good guess universally.
More
Post by Dmitriy Lyubimov
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code) the
initial paralellization adjustment options should probably be provided to
CLI.
Pat Ferrel
2016-04-29 16:15:10 UTC
Permalink
It’s certainly easy to put this in the driver, taking it out of the algo.

Dmitriy, is it a candidate for an Option param to the algo? That would catch cases where people rely on it now (like my old DStream example) but easily allow it to be overridden to None to imitate pre 0.11, or passed in when the app knows better.

Nikaash, are you in a position to comment out the .par(auto=true) and see if it makes a difference?


On Apr 29, 2016, at 8:53 AM, Dmitriy Lyubimov <***@gmail.com> wrote:

can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?

the row similarity is a very short pipeline (in comparison with what would
normally be on average). so only the first input re-splitting is critical.

The splitting along the products is adjusted by optimizer automatically to
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B has
5000 elements per split then C would approximately have 5000 elements per
split (the larger average in binary operator cases). That's approximately
how it works.

However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now doubt it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for multioplication
unfortunately depends on tons of factors -- network bandwidth and hardware
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code) the
initial paralellization adjustment options should probably be provided to
CLI.
Dmitriy Lyubimov
2016-04-29 16:23:15 UTC
Permalink
yes -- i would do it as an optional option -- just like par does -- do
nothing; try auto, or try exact number of splits
It’s certainly easy to put this in the driver, taking it out of the algo.
Dmitriy, is it a candidate for an Option param to the algo? That would
catch cases where people rely on it now (like my old DStream example) but
easily allow it to be overridden to None to imitate pre 0.11, or passed in
when the app knows better.
Nikaash, are you in a position to comment out the .par(auto=true) and see
if it makes a difference?
can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?
the row similarity is a very short pipeline (in comparison with what would
normally be on average). so only the first input re-splitting is critical.
The splitting along the products is adjusted by optimizer automatically to
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B has
5000 elements per split then C would approximately have 5000 elements per
split (the larger average in binary operator cases). That's approximately
how it works.
However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now doubt it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for multioplication
unfortunately depends on tons of factors -- network bandwidth and hardware
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code) the
initial paralellization adjustment options should probably be provided to
CLI.
Pat Ferrel
2016-04-29 16:48:14 UTC
Permalink
There are some other changes I want to make for the next rev so I’ll do that.

Nikaash, it would still be nice to verify this fixes your problem, also if you want to create a Jira it will guarantee I don’t forget.


On Apr 29, 2016, at 9:23 AM, Dmitriy Lyubimov <***@gmail.com> wrote:

yes -- i would do it as an optional option -- just like par does -- do nothing; try auto, or try exact number of splits

On Fri, Apr 29, 2016 at 9:15 AM, Pat Ferrel <***@occamsmachete.com <mailto:***@occamsmachete.com>> wrote:
It’s certainly easy to put this in the driver, taking it out of the algo.

Dmitriy, is it a candidate for an Option param to the algo? That would catch cases where people rely on it now (like my old DStream example) but easily allow it to be overridden to None to imitate pre 0.11, or passed in when the app knows better.

Nikaash, are you in a position to comment out the .par(auto=true) and see if it makes a difference?


On Apr 29, 2016, at 8:53 AM, Dmitriy Lyubimov <***@gmail.com <mailto:***@gmail.com>> wrote:

can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?

the row similarity is a very short pipeline (in comparison with what would
normally be on average). so only the first input re-splitting is critical.

The splitting along the products is adjusted by optimizer automatically to
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B has
5000 elements per split then C would approximately have 5000 elements per
split (the larger average in binary operator cases). That's approximately
how it works.

However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now doubt it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for multioplication
unfortunately depends on tons of factors -- network bandwidth and hardware
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code) the
initial paralellization adjustment options should probably be provided to
CLI.
Nikaash Puri
2016-04-29 18:14:14 UTC
Permalink
Hi,

Sure, I’ll do some more detailed analysis of the jobs on the UI and share screenshots if possible.

Pat, yup, I’ll only be able to get to this on Monday, though. I’ll comment out the line and see the difference in performance.

Thanks so much for helping guys, I really appreciate it.

Also, the algorithm implementation for LLR is extremely performant, at least as of Mahout 0.10. I ran some tests for around 61 days of data (which in our case is a fair amount) and the model was built in about 20 minutes, which is pretty amazing. This was using a pretty decent sized cluster, though.

Thank you,
Nikaash Puri
Post by Pat Ferrel
There are some other changes I want to make for the next rev so I’ll do that.
Nikaash, it would still be nice to verify this fixes your problem, also if you want to create a Jira it will guarantee I don’t forget.
yes -- i would do it as an optional option -- just like par does -- do nothing; try auto, or try exact number of splits
It’s certainly easy to put this in the driver, taking it out of the algo.
Dmitriy, is it a candidate for an Option param to the algo? That would catch cases where people rely on it now (like my old DStream example) but easily allow it to be overridden to None to imitate pre 0.11, or passed in when the app knows better.
Nikaash, are you in a position to comment out the .par(auto=true) and see if it makes a difference?
can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?
the row similarity is a very short pipeline (in comparison with what would
normally be on average). so only the first input re-splitting is critical.
The splitting along the products is adjusted by optimizer automatically to
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B has
5000 elements per split then C would approximately have 5000 elements per
split (the larger average in binary operator cases). That's approximately
how it works.
However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now doubt it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for multioplication
unfortunately depends on tons of factors -- network bandwidth and hardware
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code) the
initial paralellization adjustment options should probably be provided to
CLI.
Dmitriy Lyubimov
2016-04-29 18:24:57 UTC
Permalink
Nikaash,

yes unfortunately you may need to play with parallelism for your particular
load/cluster manually to get the best out of it. I guess Pat will be adding
the option.
Post by Nikaash Puri
Hi,
Sure, I’ll do some more detailed analysis of the jobs on the UI and share
screenshots if possible.
Pat, yup, I’ll only be able to get to this on Monday, though. I’ll comment
out the line and see the difference in performance.
Thanks so much for helping guys, I really appreciate it.
Also, the algorithm implementation for LLR is extremely performant, at
least as of Mahout 0.10. I ran some tests for around 61 days of data (which
in our case is a fair amount) and the model was built in about 20 minutes,
which is pretty amazing. This was using a pretty decent sized cluster,
though.
Thank you,
Nikaash Puri
There are some other changes I want to make for the next rev so I’ll do that.
Nikaash, it would still be nice to verify this fixes your problem, also if
you want to create a Jira it will guarantee I don’t forget.
yes -- i would do it as an optional option -- just like par does -- do
nothing; try auto, or try exact number of splits
It’s certainly easy to put this in the driver, taking it out of the algo.
Dmitriy, is it a candidate for an Option param to the algo? That would
catch cases where people rely on it now (like my old DStream example) but
easily allow it to be overridden to None to imitate pre 0.11, or passed in
when the app knows better.
Nikaash, are you in a position to comment out the .par(auto=true) and see
if it makes a difference?
can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?
the row similarity is a very short pipeline (in comparison with what would
normally be on average). so only the first input re-splitting is critical.
The splitting along the products is adjusted by optimizer automatically to
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B has
5000 elements per split then C would approximately have 5000 elements per
split (the larger average in binary operator cases). That's approximately
how it works.
However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now doubt it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for
multioplication
unfortunately depends on tons of factors -- network bandwidth and hardware
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code) the
initial paralellization adjustment options should probably be provided to
CLI.
Pat Ferrel
2016-04-29 19:06:21 UTC
Permalink
Right, will do. But Nakaash if you could just comment out those lines and see if it has an effect it would be informative and even perhaps solve your problem sooner than my changes. No great rush. Playing around with different values, as Dmitriy says, might yield better results and for that you can mess with the code or wait for my changes.

Yeah, it’s fast enough in most cases. The main work is the optimized A’A, A’B stuff in the BLAS optimizer Dmitriy put in. It is something like 10x faster than a similar algo in Hadoop MR. This particular calc and generalization is not in any other Spark or now Flink lib that I know of.


On Apr 29, 2016, at 11:24 AM, Dmitriy Lyubimov <***@gmail.com> wrote:

Nikaash,

yes unfortunately you may need to play with parallelism for your particular
load/cluster manually to get the best out of it. I guess Pat will be adding
the option.
Hi,
Sure, I’ll do some more detailed analysis of the jobs on the UI and share
screenshots if possible.
Pat, yup, I’ll only be able to get to this on Monday, though. I’ll comment
out the line and see the difference in performance.
Thanks so much for helping guys, I really appreciate it.
Also, the algorithm implementation for LLR is extremely performant, at
least as of Mahout 0.10. I ran some tests for around 61 days of data (which
in our case is a fair amount) and the model was built in about 20 minutes,
which is pretty amazing. This was using a pretty decent sized cluster,
though.
Thank you,
Nikaash Puri
There are some other changes I want to make for the next rev so I’ll do
that.
Nikaash, it would still be nice to verify this fixes your problem, also if
you want to create a Jira it will guarantee I don’t forget.
yes -- i would do it as an optional option -- just like par does -- do
nothing; try auto, or try exact number of splits
Post by Pat Ferrel
It’s certainly easy to put this in the driver, taking it out of the algo.
Dmitriy, is it a candidate for an Option param to the algo? That would
catch cases where people rely on it now (like my old DStream example) but
easily allow it to be overridden to None to imitate pre 0.11, or passed in
when the app knows better.
Nikaash, are you in a position to comment out the .par(auto=true) and see
if it makes a difference?
can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?
the row similarity is a very short pipeline (in comparison with what would
normally be on average). so only the first input re-splitting is critical.
The splitting along the products is adjusted by optimizer automatically to
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B has
5000 elements per split then C would approximately have 5000 elements per
split (the larger average in binary operator cases). That's approximately
how it works.
However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now doubt it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for
multioplication
unfortunately depends on tons of factors -- network bandwidth and hardware
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code) the
initial paralellization adjustment options should probably be provided to
CLI.
Nikaash Puri
2016-05-02 10:35:38 UTC
Permalink
Hi,

I tried commenting out those lines and it did marginally improve the
performance. Although, the 0.10 version still significantly outperforms it.

Here is a screenshot of the saveAsTextFile job (attached as selection1).
The AtB step took about 34 mins, which is significantly more than using
0.10. Similarly, the saveAsTextFile action takes about 9 mins as well.

The selection2 file is a screenshot of the flatMap at AtB.scala job, which
ran for 34 minutes,

Also, I'm using multiple indicators. As of Mahout 0.10, the first AtB would
take time, while subsequent such operations for the other indicators would
be orders of magnitudes faster. In the current job, the subsequent AtB
operations take time similar to the first one.

A snapshot of my code is as follows:

var existingRowIDs: Option[BiDictionary] = None

// The first action named in the sequence is the "primary" action and
begins to fill up the user dictionary
for (actionDescription <- actionInput) {
// grab the path to actions
val action: IndexedDataset = SparkEngine.indexedDatasetDFSReadElements(
actionDescription._2,
schema = DefaultIndexedDatasetElementReadSchema,
existingRowIDs = existingRowIDs)
existingRowIDs = Some(action.rowIDs)

...
}

which seems fairly standard, so I hope I'm not making a mistake here.

It looks like the 0.11 onward version is using computeAtBZipped3 for
performing the multiplication in atb_nograph_mmul unlike 0.10 which was
using atb_nograph. Though I'm not really sure whether that makes much of a
difference.

Thank you,
Nikaash Puri
Post by Pat Ferrel
Right, will do. But Nakaash if you could just comment out those lines and
see if it has an effect it would be informative and even perhaps solve your
problem sooner than my changes. No great rush. Playing around with
different values, as Dmitriy says, might yield better results and for that
you can mess with the code or wait for my changes.
Yeah, it’s fast enough in most cases. The main work is the optimized A’A,
A’B stuff in the BLAS optimizer Dmitriy put in. It is something like 10x
faster than a similar algo in Hadoop MR. This particular calc and
generalization is not in any other Spark or now Flink lib that I know of.
Nikaash,
yes unfortunately you may need to play with parallelism for your particular
load/cluster manually to get the best out of it. I guess Pat will be adding
the option.
Post by Nikaash Puri
Hi,
Sure, I’ll do some more detailed analysis of the jobs on the UI and share
screenshots if possible.
Pat, yup, I’ll only be able to get to this on Monday, though. I’ll
comment
Post by Nikaash Puri
out the line and see the difference in performance.
Thanks so much for helping guys, I really appreciate it.
Also, the algorithm implementation for LLR is extremely performant, at
least as of Mahout 0.10. I ran some tests for around 61 days of data
(which
Post by Nikaash Puri
in our case is a fair amount) and the model was built in about 20
minutes,
Post by Nikaash Puri
which is pretty amazing. This was using a pretty decent sized cluster,
though.
Thank you,
Nikaash Puri
There are some other changes I want to make for the next rev so I’ll do that.
Nikaash, it would still be nice to verify this fixes your problem, also
if
Post by Nikaash Puri
you want to create a Jira it will guarantee I don’t forget.
yes -- i would do it as an optional option -- just like par does -- do
nothing; try auto, or try exact number of splits
It’s certainly easy to put this in the driver, taking it out of the
algo.
Post by Nikaash Puri
Dmitriy, is it a candidate for an Option param to the algo? That would
catch cases where people rely on it now (like my old DStream example)
but
Post by Nikaash Puri
easily allow it to be overridden to None to imitate pre 0.11, or passed
in
Post by Nikaash Puri
when the app knows better.
Nikaash, are you in a position to comment out the .par(auto=true) and
see
Post by Nikaash Puri
if it makes a difference?
can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?
the row similarity is a very short pipeline (in comparison with what
would
Post by Nikaash Puri
normally be on average). so only the first input re-splitting is
critical.
Post by Nikaash Puri
The splitting along the products is adjusted by optimizer automatically
to
Post by Nikaash Puri
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B
has
Post by Nikaash Puri
5000 elements per split then C would approximately have 5000 elements
per
Post by Nikaash Puri
split (the larger average in binary operator cases). That's
approximately
Post by Nikaash Puri
how it works.
However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now doubt it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for
multioplication
unfortunately depends on tons of factors -- network bandwidth and
hardware
Post by Nikaash Puri
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code)
the
Post by Nikaash Puri
initial paralellization adjustment options should probably be provided
to
Post by Nikaash Puri
CLI.
Dmitriy Lyubimov
2016-05-02 16:06:57 UTC
Permalink
ok.

Nikaash,
could you perhaps do one more experiment and graph the 0.10 a'b code into
0.12 code (or whatever branch you say is not working the same) so we could
quite confirm that the culprit change is indeed AB'?

thank you very much.

-d
Post by Nikaash Puri
Hi,
I tried commenting out those lines and it did marginally improve the
performance. Although, the 0.10 version still significantly outperforms it.
Here is a screenshot of the saveAsTextFile job (attached as selection1).
The AtB step took about 34 mins, which is significantly more than using
0.10. Similarly, the saveAsTextFile action takes about 9 mins as well.
The selection2 file is a screenshot of the flatMap at AtB.scala job, which
ran for 34 minutes,
Also, I'm using multiple indicators. As of Mahout 0.10, the first AtB
would take time, while subsequent such operations for the other indicators
would be orders of magnitudes faster. In the current job, the subsequent
AtB operations take time similar to the first one.
var existingRowIDs: Option[BiDictionary] = None
// The first action named in the sequence is the "primary" action and begins to fill up the user dictionary
for (actionDescription <- actionInput) {
// grab the path to actions
val action: IndexedDataset = SparkEngine.indexedDatasetDFSReadElements(
actionDescription._2,
schema = DefaultIndexedDatasetElementReadSchema,
existingRowIDs = existingRowIDs)
existingRowIDs = Some(action.rowIDs)
...
}
which seems fairly standard, so I hope I'm not making a mistake here.
It looks like the 0.11 onward version is using computeAtBZipped3 for
performing the multiplication in atb_nograph_mmul unlike 0.10 which was
using atb_nograph. Though I'm not really sure whether that makes much of a
difference.
Thank you,
Nikaash Puri
Post by Pat Ferrel
Right, will do. But Nakaash if you could just comment out those lines and
see if it has an effect it would be informative and even perhaps solve your
problem sooner than my changes. No great rush. Playing around with
different values, as Dmitriy says, might yield better results and for that
you can mess with the code or wait for my changes.
Yeah, it’s fast enough in most cases. The main work is the optimized A’A,
A’B stuff in the BLAS optimizer Dmitriy put in. It is something like 10x
faster than a similar algo in Hadoop MR. This particular calc and
generalization is not in any other Spark or now Flink lib that I know of.
Nikaash,
yes unfortunately you may need to play with parallelism for your particular
load/cluster manually to get the best out of it. I guess Pat will be adding
the option.
Post by Nikaash Puri
Hi,
Sure, I’ll do some more detailed analysis of the jobs on the UI and
share
Post by Nikaash Puri
screenshots if possible.
Pat, yup, I’ll only be able to get to this on Monday, though. I’ll
comment
Post by Nikaash Puri
out the line and see the difference in performance.
Thanks so much for helping guys, I really appreciate it.
Also, the algorithm implementation for LLR is extremely performant, at
least as of Mahout 0.10. I ran some tests for around 61 days of data
(which
Post by Nikaash Puri
in our case is a fair amount) and the model was built in about 20
minutes,
Post by Nikaash Puri
which is pretty amazing. This was using a pretty decent sized cluster,
though.
Thank you,
Nikaash Puri
There are some other changes I want to make for the next rev so I’ll do that.
Nikaash, it would still be nice to verify this fixes your problem, also
if
Post by Nikaash Puri
you want to create a Jira it will guarantee I don’t forget.
yes -- i would do it as an optional option -- just like par does -- do
nothing; try auto, or try exact number of splits
It’s certainly easy to put this in the driver, taking it out of the
algo.
Post by Nikaash Puri
Dmitriy, is it a candidate for an Option param to the algo? That would
catch cases where people rely on it now (like my old DStream example)
but
Post by Nikaash Puri
easily allow it to be overridden to None to imitate pre 0.11, or
passed in
Post by Nikaash Puri
when the app knows better.
Nikaash, are you in a position to comment out the .par(auto=true) and
see
Post by Nikaash Puri
if it makes a difference?
can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?
the row similarity is a very short pipeline (in comparison with what
would
Post by Nikaash Puri
normally be on average). so only the first input re-splitting is
critical.
Post by Nikaash Puri
The splitting along the products is adjusted by optimizer
automatically to
Post by Nikaash Puri
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B
has
Post by Nikaash Puri
5000 elements per split then C would approximately have 5000 elements
per
Post by Nikaash Puri
split (the larger average in binary operator cases). That's
approximately
Post by Nikaash Puri
how it works.
However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now
doubt
Post by Nikaash Puri
it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for multioplication
unfortunately depends on tons of factors -- network bandwidth and
hardware
Post by Nikaash Puri
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but
rather
Post by Nikaash Puri
assemble pipelines in scala via scripting and scala application code)
the
Post by Nikaash Puri
initial paralellization adjustment options should probably be provided
to
Post by Nikaash Puri
CLI.
Dmitriy Lyubimov
2016-05-03 00:56:48 UTC
Permalink
graph = graft, sorry. Graft just the AtB class into 0.12 codebase.
Post by Pat Ferrel
ok.
Nikaash,
could you perhaps do one more experiment and graph the 0.10 a'b code into
0.12 code (or whatever branch you say is not working the same) so we could
quite confirm that the culprit change is indeed AB'?
thank you very much.
-d
Post by Nikaash Puri
Hi,
I tried commenting out those lines and it did marginally improve the
performance. Although, the 0.10 version still significantly outperforms it.
Here is a screenshot of the saveAsTextFile job (attached as selection1).
The AtB step took about 34 mins, which is significantly more than using
0.10. Similarly, the saveAsTextFile action takes about 9 mins as well.
The selection2 file is a screenshot of the flatMap at AtB.scala job,
which ran for 34 minutes,
Also, I'm using multiple indicators. As of Mahout 0.10, the first AtB
would take time, while subsequent such operations for the other indicators
would be orders of magnitudes faster. In the current job, the subsequent
AtB operations take time similar to the first one.
var existingRowIDs: Option[BiDictionary] = None
// The first action named in the sequence is the "primary" action and begins to fill up the user dictionary
for (actionDescription <- actionInput) {
// grab the path to actions
val action: IndexedDataset = SparkEngine.indexedDatasetDFSReadElements(
actionDescription._2,
schema = DefaultIndexedDatasetElementReadSchema,
existingRowIDs = existingRowIDs)
existingRowIDs = Some(action.rowIDs)
...
}
which seems fairly standard, so I hope I'm not making a mistake here.
It looks like the 0.11 onward version is using computeAtBZipped3 for
performing the multiplication in atb_nograph_mmul unlike 0.10 which was
using atb_nograph. Though I'm not really sure whether that makes much of a
difference.
Thank you,
Nikaash Puri
Post by Pat Ferrel
Right, will do. But Nakaash if you could just comment out those lines
and see if it has an effect it would be informative and even perhaps solve
your problem sooner than my changes. No great rush. Playing around with
different values, as Dmitriy says, might yield better results and for that
you can mess with the code or wait for my changes.
Yeah, it’s fast enough in most cases. The main work is the optimized
A’A, A’B stuff in the BLAS optimizer Dmitriy put in. It is something like
10x faster than a similar algo in Hadoop MR. This particular calc and
generalization is not in any other Spark or now Flink lib that I know of.
Nikaash,
yes unfortunately you may need to play with parallelism for your particular
load/cluster manually to get the best out of it. I guess Pat will be adding
the option.
Post by Nikaash Puri
Hi,
Sure, I’ll do some more detailed analysis of the jobs on the UI and
share
Post by Nikaash Puri
screenshots if possible.
Pat, yup, I’ll only be able to get to this on Monday, though. I’ll
comment
Post by Nikaash Puri
out the line and see the difference in performance.
Thanks so much for helping guys, I really appreciate it.
Also, the algorithm implementation for LLR is extremely performant, at
least as of Mahout 0.10. I ran some tests for around 61 days of data
(which
Post by Nikaash Puri
in our case is a fair amount) and the model was built in about 20
minutes,
Post by Nikaash Puri
which is pretty amazing. This was using a pretty decent sized cluster,
though.
Thank you,
Nikaash Puri
There are some other changes I want to make for the next rev so I’ll do that.
Nikaash, it would still be nice to verify this fixes your problem,
also if
Post by Nikaash Puri
you want to create a Jira it will guarantee I don’t forget.
yes -- i would do it as an optional option -- just like par does -- do
nothing; try auto, or try exact number of splits
It’s certainly easy to put this in the driver, taking it out of the
algo.
Post by Nikaash Puri
Dmitriy, is it a candidate for an Option param to the algo? That would
catch cases where people rely on it now (like my old DStream example)
but
Post by Nikaash Puri
easily allow it to be overridden to None to imitate pre 0.11, or
passed in
Post by Nikaash Puri
when the app knows better.
Nikaash, are you in a position to comment out the .par(auto=true) and
see
Post by Nikaash Puri
if it makes a difference?
can you please look into spark UI and write down how many split the
job
Post by Nikaash Puri
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?
the row similarity is a very short pipeline (in comparison with what
would
Post by Nikaash Puri
normally be on average). so only the first input re-splitting is
critical.
Post by Nikaash Puri
The splitting along the products is adjusted by optimizer
automatically to
Post by Nikaash Puri
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and
B has
Post by Nikaash Puri
5000 elements per split then C would approximately have 5000 elements
per
Post by Nikaash Puri
split (the larger average in binary operator cases). That's
approximately
Post by Nikaash Puri
how it works.
However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now
doubt
Post by Nikaash Puri
it
was a good thing -- when i suggested Pat to try this, i did not mean
to
Post by Nikaash Puri
put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it
will
Post by Nikaash Puri
work in any given case. Actually sweet spot parallelism for multioplication
unfortunately depends on tons of factors -- network bandwidth and
hardware
Post by Nikaash Puri
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but
rather
Post by Nikaash Puri
assemble pipelines in scala via scripting and scala application code)
the
Post by Nikaash Puri
initial paralellization adjustment options should probably be
provided to
Post by Nikaash Puri
CLI.
Nikaash Puri
2016-05-05 04:41:34 UTC
Permalink
Hi,

Ok, so another interesting result. When I compute cross-cooccurrences with
user profile attributes that have high cardinality (for instance city), the
AtB step completes in roughly 11 minutes on some data set. Now, if I do the
same calculation on a profile attribute such as gender having simply two
distinct values, the AtB step is much slower. In my case, the profile
attribute I was using had a small number of distinct values.

Could this be because of the indicator matrix no longer remaining sparse
(just venturing a guess here)?

These results are from Mahout 0.10 and Spark 1.2.0

Thank you,
Nikaash Puri
Post by Dmitriy Lyubimov
graph = graft, sorry. Graft just the AtB class into 0.12 codebase.
Post by Pat Ferrel
ok.
Nikaash,
could you perhaps do one more experiment and graph the 0.10 a'b code into
0.12 code (or whatever branch you say is not working the same) so we
could
Post by Pat Ferrel
quite confirm that the culprit change is indeed AB'?
thank you very much.
-d
Post by Nikaash Puri
Hi,
I tried commenting out those lines and it did marginally improve the
performance. Although, the 0.10 version still significantly outperforms
it.
Post by Pat Ferrel
Post by Nikaash Puri
Here is a screenshot of the saveAsTextFile job (attached as selection1).
The AtB step took about 34 mins, which is significantly more than using
0.10. Similarly, the saveAsTextFile action takes about 9 mins as well.
The selection2 file is a screenshot of the flatMap at AtB.scala job,
which ran for 34 minutes,
Also, I'm using multiple indicators. As of Mahout 0.10, the first AtB
would take time, while subsequent such operations for the other
indicators
Post by Pat Ferrel
Post by Nikaash Puri
would be orders of magnitudes faster. In the current job, the subsequent
AtB operations take time similar to the first one.
var existingRowIDs: Option[BiDictionary] = None
// The first action named in the sequence is the "primary" action and
begins to fill up the user dictionary
Post by Pat Ferrel
Post by Nikaash Puri
for (actionDescription <- actionInput) {
// grab the path to actions
val action: IndexedDataset =
SparkEngine.indexedDatasetDFSReadElements(
Post by Pat Ferrel
Post by Nikaash Puri
actionDescription._2,
schema = DefaultIndexedDatasetElementReadSchema,
existingRowIDs = existingRowIDs)
existingRowIDs = Some(action.rowIDs)
...
}
which seems fairly standard, so I hope I'm not making a mistake here.
It looks like the 0.11 onward version is using computeAtBZipped3 for
performing the multiplication in atb_nograph_mmul unlike 0.10 which was
using atb_nograph. Though I'm not really sure whether that makes much
of a
Post by Pat Ferrel
Post by Nikaash Puri
difference.
Thank you,
Nikaash Puri
Post by Pat Ferrel
Right, will do. But Nakaash if you could just comment out those lines
and see if it has an effect it would be informative and even perhaps
solve
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
your problem sooner than my changes. No great rush. Playing around with
different values, as Dmitriy says, might yield better results and for
that
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
you can mess with the code or wait for my changes.
Yeah, it’s fast enough in most cases. The main work is the optimized
A’A, A’B stuff in the BLAS optimizer Dmitriy put in. It is something
like
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
10x faster than a similar algo in Hadoop MR. This particular calc and
generalization is not in any other Spark or now Flink lib that I know
of.
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
Nikaash,
yes unfortunately you may need to play with parallelism for your particular
load/cluster manually to get the best out of it. I guess Pat will be adding
the option.
Post by Nikaash Puri
Hi,
Sure, I’ll do some more detailed analysis of the jobs on the UI and
share
Post by Nikaash Puri
screenshots if possible.
Pat, yup, I’ll only be able to get to this on Monday, though. I’ll
comment
Post by Nikaash Puri
out the line and see the difference in performance.
Thanks so much for helping guys, I really appreciate it.
Also, the algorithm implementation for LLR is extremely performant,
at
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
Post by Nikaash Puri
least as of Mahout 0.10. I ran some tests for around 61 days of data
(which
Post by Nikaash Puri
in our case is a fair amount) and the model was built in about 20
minutes,
Post by Nikaash Puri
which is pretty amazing. This was using a pretty decent sized
cluster,
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
Post by Nikaash Puri
though.
Thank you,
Nikaash Puri
There are some other changes I want to make for the next rev so I’ll
do
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
Post by Nikaash Puri
that.
Nikaash, it would still be nice to verify this fixes your problem,
also if
Post by Nikaash Puri
you want to create a Jira it will guarantee I don’t forget.
yes -- i would do it as an optional option -- just like par does --
do
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
Post by Nikaash Puri
nothing; try auto, or try exact number of splits
It’s certainly easy to put this in the driver, taking it out of the
algo.
Post by Nikaash Puri
Dmitriy, is it a candidate for an Option param to the algo? That
would
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
Post by Nikaash Puri
catch cases where people rely on it now (like my old DStream
example)
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
but
Post by Nikaash Puri
easily allow it to be overridden to None to imitate pre 0.11, or
passed in
Post by Nikaash Puri
when the app knows better.
Nikaash, are you in a position to comment out the .par(auto=true)
and
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
see
Post by Nikaash Puri
if it makes a difference?
can you please look into spark UI and write down how many split the
job
Post by Nikaash Puri
generates in the first stage of the pipeline, or anywhere else
there's
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
Post by Nikaash Puri
signficant variation in # of splits in both cases?
the row similarity is a very short pipeline (in comparison with what
would
Post by Nikaash Puri
normally be on average). so only the first input re-splitting is
critical.
Post by Nikaash Puri
The splitting along the products is adjusted by optimizer
automatically to
Post by Nikaash Puri
match the amount of data segments observed on average in the
input(s).
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
Post by Nikaash Puri
e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and
B has
Post by Nikaash Puri
5000 elements per split then C would approximately have 5000
elements
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
per
Post by Nikaash Puri
split (the larger average in binary operator cases). That's
approximately
Post by Nikaash Puri
how it works.
However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now
doubt
Post by Nikaash Puri
it
was a good thing -- when i suggested Pat to try this, i did not mean
to
Post by Nikaash Puri
put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it
will
Post by Nikaash Puri
work in any given case. Actually sweet spot parallelism for multioplication
unfortunately depends on tons of factors -- network bandwidth and
hardware
Post by Nikaash Puri
configuration, so it is difficult to give it a good guess
universally.
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
Post by Nikaash Puri
More
likely, for cli-based prepackaged algorithms (I don't use CLI but
rather
Post by Nikaash Puri
assemble pipelines in scala via scripting and scala application
code)
Post by Pat Ferrel
Post by Nikaash Puri
Post by Pat Ferrel
the
Post by Nikaash Puri
initial paralellization adjustment options should probably be
provided to
Post by Nikaash Puri
CLI.
Loading...