Discussion:
[jira] [Updated] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.
Dmitriy Lyubimov (JIRA)
2015-11-15 00:04:10 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dmitriy Lyubimov updated MAHOUT-1790:
-------------------------------------
Sprint: Nov/Dec-2015
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Priority: Minor
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2015-11-15 00:12:11 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15005677#comment-15005677 ]

Dmitriy Lyubimov commented on MAHOUT-1790:
------------------------------------------

+1. treeReduce() is a new api appeared in spark 1.3. The only complaint about this api i heard was "why they didn't do it in the first place". And they may have taken care of the optimization of the depth too. Semantically equivalent to reduce().

which means this fix is applicable to Mahout 0.11.x + branches.

[~FlamingMike] in the spirit of the Apache "power of do", can you try to suggest the patch?
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Priority: Minor
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2015-11-15 00:13:10 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dmitriy Lyubimov reassigned MAHOUT-1790:
----------------------------------------

Assignee: Dmitriy Lyubimov
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Suneel Marthi (JIRA)
2016-03-15 03:14:33 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15194674#comment-15194674 ]

Suneel Marthi commented on MAHOUT-1790:
---------------------------------------

any updates on this?
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.12.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Suneel Marthi (JIRA)
2016-03-15 03:14:33 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Suneel Marthi updated MAHOUT-1790:
----------------------------------
Fix Version/s: 0.12.0
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.12.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2016-03-25 17:25:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212117#comment-15212117 ]

Dmitriy Lyubimov commented on MAHOUT-1790:
------------------------------------------

[~smarthi]
(1) Actually i don't know why this comment (of mine) got in here. This is a different issue altogether -- it is an issue of a spark setting. if there's a collection of large volume of information to the front end, spark has a setting to limit it at certain number by default (1Gb), apparently in attempt to maintain the robustness of the backend w.r.t. runaway processes and ill programming. If there's a legitimate big collection to the front that exceeds 1G then the solution is just bump up this setting with spark. (-Dspark.max.result or something).

(2) I still have a question why would the operation invoked cause such a big collection to the front -- this needs to be investigated. However, this capability, numNonZero etc. was added by either [~ssc] or [~pferrel], so i'd suggest for these folks to take a look at it again. I don't know first hand knowledge of this logic.

If everything else fails, i'll take a look at some point -- but not soon, as it is not a priority for me.
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.12.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2016-03-25 17:26:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212117#comment-15212117 ]

Dmitriy Lyubimov edited comment on MAHOUT-1790 at 3/25/16 5:25 PM:
-------------------------------------------------------------------

[~smarthi]
(1) Actually i don't know why this comment (of mine) got in here. This is a different issue altogether -- it is an issue of a spark setting. if there's a collection of large volume of information to the front end, spark has a setting to limit it at certain number by default (1Gb), apparently in attempt to maintain the robustness of the backend w.r.t. runaway processes and ill programming. If there's a legitimate big collection to the front that exceeds 1G then the solution is just bump up this setting with spark. (-Dspark.max.result or something).

(2) I still have a question why would the operation invoked cause such a big collection to the front -- this needs to be investigated. However, this capability, numNonZero etc. was added by either [~ssc] or [~pferrel], so i'd suggest for these folks to take a look at it again. I don't have the first hand knowledge of this logic.

If everything else fails, i'll take a look at some point -- but not soon, as it is not a priority for me.







was (Author: dlyubimov):
[~smarthi]
(1) Actually i don't know why this comment (of mine) got in here. This is a different issue altogether -- it is an issue of a spark setting. if there's a collection of large volume of information to the front end, spark has a setting to limit it at certain number by default (1Gb), apparently in attempt to maintain the robustness of the backend w.r.t. runaway processes and ill programming. If there's a legitimate big collection to the front that exceeds 1G then the solution is just bump up this setting with spark. (-Dspark.max.result or something).

(2) I still have a question why would the operation invoked cause such a big collection to the front -- this needs to be investigated. However, this capability, numNonZero etc. was added by either [~ssc] or [~pferrel], so i'd suggest for these folks to take a look at it again. I don't know first hand knowledge of this logic.

If everything else fails, i'll take a look at some point -- but not soon, as it is not a priority for me.
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.12.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Suneel Marthi (JIRA)
2016-03-25 17:38:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Suneel Marthi updated MAHOUT-1790:
----------------------------------
Comment: was deleted

(was: any updates on this?)
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.12.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Suneel Marthi (JIRA)
2016-03-30 07:00:34 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Suneel Marthi updated MAHOUT-1790:
----------------------------------
Fix Version/s: (was: 0.12.0)
0.12.1
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.12.1
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2016-03-30 17:18:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dmitriy Lyubimov updated MAHOUT-1790:
-------------------------------------
Sprint: Jan/Feb-2016 (was: Nov/Dec-2015)
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.12.1
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2016-03-30 17:25:26 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dmitriy Lyubimov updated MAHOUT-1790:
-------------------------------------
Sprint: Mar/Apr-2016 (was: Jan/Feb-2016)
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.12.1
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2016-03-30 17:26:26 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dmitriy Lyubimov updated MAHOUT-1790:
-------------------------------------
Sprint: Jan/Feb-2016 (was: Mar/Apr-2016)
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.12.1
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-05-03 18:14:12 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15269259#comment-15269259 ]

Andrew Palumbo commented on MAHOUT-1790:
----------------------------------------

Is this a valid issue?
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.12.1
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-05-10 03:37:12 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15277548#comment-15277548 ]

Andrew Palumbo commented on MAHOUT-1790:
----------------------------------------

[~dlyubimov], [~pferrel] is there something to do here?
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.12.1
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-05-10 03:38:12 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1790:
-----------------------------------
Fix Version/s: (was: 0.12.1)
1.0.0
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 1.0.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-05-11 03:59:12 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo resolved MAHOUT-1790.
------------------------------------
Resolution: Not A Bug

Resolving this issue as not bug since it seems to be a configuration issue, and there has been no movement on it in some time. [~FlamingMike] Please let us know if there is still an issue here.
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 1.0.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Michel Lemay (JIRA)
2016-05-11 11:39:12 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15279990#comment-15279990 ]

Michel Lemay commented on MAHOUT-1790:
--------------------------------------

Although there is a simple workaround in configuration, I don't think it is a good design choice to close this issue as not-a-bug when such a simple fix can be made to ensure it is robust to this scenario.

When computing the logLikelihood of an event (need to compute nnz) in a very large but sparse matrix, you end up crashing spark. Spark driver should never have to deal with very large collection of data. The whole design of map-reduce is to prevent such wasteful aggregation of data on a single node.

It's your call but I don't agree with your resolution.
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 1.0.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-05-11 15:12:12 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo reopened MAHOUT-1790:
------------------------------------

Thanks for the input, [~FlamingMike], Reopening this. Would you be able to provide a pull request with your fix for review?
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.13.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-05-11 15:12:12 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1790:
-----------------------------------
Fix Version/s: (was: 1.0.0)
0.13.0
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.13.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-12-19 05:56:58 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15760250#comment-15760250 ]

Andrew Palumbo commented on MAHOUT-1790:
----------------------------------------

What's the status here [~FlamingMike]?
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.13.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-12-20 18:58:58 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1790:
-----------------------------------
Sprint: Jan/Feb-2017 (was: Jan/Feb-2016)
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.13.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2017-01-16 02:28:28 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1790:
-----------------------------------
Sprint: Jan/Feb-2016 (was: Jan/Feb-2017)
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.13.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2017-01-16 02:28:29 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1790:
-----------------------------------
Sprint: Jan/Feb-2017 (was: Jan/Feb-2016)
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Dmitriy Lyubimov
Priority: Minor
Fix For: 0.13.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2017-01-16 03:21:26 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo reassigned MAHOUT-1790:
--------------------------------------

Assignee: Andrew Palumbo (was: Dmitriy Lyubimov)
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.13.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2017-01-16 03:23:26 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823400#comment-15823400 ]

Andrew Palumbo commented on MAHOUT-1790:
----------------------------------------

[~dlyubimov] Do you have any thoughts on this?
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.13.0
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2017-02-01 22:56:51 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1790:
-----------------------------------
Sprint: (was: Jan/Feb-2017)
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.13.1
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
Andrew Palumbo (JIRA)
2017-02-01 22:56:51 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1790:
-----------------------------------
Fix Version/s: (was: 0.13.0)
0.13.1
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.13.1
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
Trevor Grant (JIRA)
2017-06-23 04:23:00 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Trevor Grant updated MAHOUT-1790:
---------------------------------
Fix Version/s: (was: 0.13.1)
0.13.2
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.13.2
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Loading...