Dmitriy Lyubimov (JIRA)
2015-11-15 00:04:10 UTC
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dmitriy Lyubimov updated MAHOUT-1790:
-------------------------------------
Sprint: Nov/Dec-2015
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov updated MAHOUT-1790:
-------------------------------------
Sprint: Nov/Dec-2015
SparkEngine nnz overflow resultSize when reducing.
--------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Priority: Minor
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
----------------------------------------------------
Key: MAHOUT-1790
URL: https://issues.apache.org/jira/browse/MAHOUT-1790
Project: Mahout
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Michel Lemay
Priority: Minor
ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 267 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
at org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
at org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
at org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
This occurs because it uses a DenseVector and spark seemingly aggregate all of them on the driver before reducing.
I think this could be easily prevented with a treeReduce(_ += _, depth) instead of a reduce(_ += _)
val maxResultSize = ....
val numPartitions = drm.rdd.partitions.size
val n = drm.ncol
val bytesPerVector = n * 8 + overhead?
val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) / math.log(2)).toInt)
This message was sent by Atlassian JIRA
(v6.3.4#6332)