[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15234189#comment-15234189 ]
ASF GitHub Bot commented on MAHOUT-1750:
----------------------------------------
Github user andrewpalumbo commented on a diff in the pull request:
https://github.com/apache/mahout/pull/215#discussion_r59137058
--- Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpABt.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.flinkbindings.blas
+
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.util.Collector
+import org.apache.mahout.logging._
+import org.apache.mahout.math.drm.{BlockifiedDrmTuple, DrmTuple}
+import org.apache.mahout.math.drm.logical.{OpAB, OpABt}
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.{Matrix, SparseMatrix, SparseRowMatrix}
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.flinkbindings.drm._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
+import org.apache.mahout.math.{Matrix, Vector}
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical.OpABt
+import org.apache.mahout.math.scalabindings.RLikeOps._
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+import org.apache.flink.api.scala.createTypeInformation
+
+/** Contains DataSet plans for ABt operator */
+object FlinkOpABt {
+
+ private final implicit val log = getLog(FlinkOpABt.getClass)
+
+ /**
+ * General entry point for AB' operator.
+ *
+ * @param operator the AB' operator
+ * @param srcA A source DataSet
+ * @param srcB B source DataSet
+ * @tparam K
+ */
+ def abt[K: ClassTag: TypeInformation](
+ operator: OpABt[K],
+ srcA: FlinkDrm[K],
+ srcB: FlinkDrm[Int]): FlinkDrm[K] = {
+
+ debug("operator AB'(Flink)")
+ abt_nograph[K](operator, srcA, srcB)
+ }
+
+ /**
+ * Computes AB'
+ *
+ * General idea here is that we split both A and B vertically into blocks (one block per split),
+ * then compute cartesian join of the blocks of both data sets. This creates tuples of the form of
+ * (A-block, B-block). We enumerate A-blocks and transform this into (A-block-id, A-block, B-block)
+ * and then compute A-block %*% B-block', thus producing tuples (A-block-id, AB'-block).
+ *
+ * The next step is to group the above tuples by A-block-id and stitch al AB'-blocks in the group
+ * horizontally, forming single vertical block of the final product AB'.
+ *
+ * This logic is complicated a little by the fact that we have to keep block row and column keys
+ * so that the stitching of AB'-blocks happens according to integer row indices of the B input.
+ */
+ private[flinkbindings] def abt_nograph[K: ClassTag: TypeInformation](
+ operator: OpABt[K],
+ srcA: FlinkDrm[K],
+ srcB: FlinkDrm[Int]): FlinkDrm[K] = {
+
+ // Blockify everything.
+ val blocksA = srcA.asBlockified
+ val blocksB = srcB.asBlockified
+
+ val prodNCol = operator.ncol
+ val prodNRow = operator.nrow
+
+
+ // blockwise multiplication function
+ def mmulFunc(tupleA: (Array[K], Matrix), tupleB: (Array[Int], Matrix)): (Array[K], Array[Int], Matrix) = {
+ val (keysA, blockA) = tupleA
+ val (keysB, blockB) = tupleB
+
+ var ms = traceDo(System.currentTimeMillis())
+
+ // We need to send keysB to the aggregator in order to know which columns are being updated.
+ val result = (keysA, keysB, blockA %*% blockB.t)
+
+ ms = traceDo(System.currentTimeMillis() - ms.get)
+ trace(
+ s"block multiplication of(${blockA.nrow}x${blockA.ncol} x ${blockB.ncol}x${blockB.nrow} is completed in $ms " +
+ "ms.")
+ trace(s"block multiplication types: blockA: ${blockA.getClass.getName}(${blockA.t.getClass.getName}); " +
+ s"blockB: ${blockB.getClass.getName}.")
+
+ result.asInstanceOf[(Array[K], Array[Int], Matrix)]
+ }
+
+
+ implicit val typeInformation = createTypeInformation[(Array[K], Matrix)]
+ implicit val typeInformation2 = createTypeInformation[(Int, (Array[K], Array[Int], Matrix))]
+ implicit val typeInformation3 = createTypeInformation[(Array[K], Array[Int], Matrix)]
+
+ val blockwiseMmulDataSet =
+
+ // Combine blocks pairwise.
+ pairwiseApply(blocksA.asBlockified.ds, blocksB.asBlockified.ds, mmulFunc)
+
+ // Now reduce proper product blocks.
+ // group by the partition key
+ .groupBy(0)
+
+ // combine as transpose
+ .combineGroup(new GroupCombineFunction[(Int, (Array[K], Array[Int], Matrix)), (Array[K], Matrix)] {
+
+ def combine(values: java.lang.Iterable[(Int, (Array[K], Array[Int], Matrix))],
+ out: Collector[(Array[K], Matrix)]): Unit = {
+ val tuple = values.iterator().next
+ val rowKeys = tuple._2._1
+ val colKeys = tuple._2._2
+ val block = tuple._2._3
+
+ val comb = new SparseMatrix(prodNCol, block.nrow).t
+ for ((col, i) <- colKeys.zipWithIndex) comb(::, col) := block(::, i)
+ val res = (rowKeys, comb)
+
+ out.collect(res)
+ }
+ })
+
+ // reduce into a final Blockified matrix
+ .reduce(new ReduceFunction[(Array[K], Matrix)] {
+
+ def reduce(mx1: (Array[K], Matrix), mx2: (Array[K], Matrix)): (Array[K], Matrix) = {
+ mx1._2 += mx2._2
+ mx1
+ }
+ })
+
+
+ new BlockifiedFlinkDrm(ds = blockwiseMmulDataSet, ncol = prodNCol)
+
+ }
+ /**
+ * This function tries to use join instead of cartesian to group blocks together without bloating
+ * the number of partitions. Hope is that we can apply pairwise reduction of block pair right away
+ * so if the data to one of the join parts is streaming, the result is still fitting to memory,
+ * since result size is much smaller than the operands.
+ *
+ * @param blocksA blockified DataSet for A
+ * @param blocksB blockified DataSet for B
+ * @param blockFunc a function over (blockA, blockB). Implies `blockA %*% blockB.t` but perhaps may be
+ * switched to another scheme based on which of the sides, A or B, is bigger.
+ */
+ private def pairwiseApply[K1: ClassTag: TypeInformation, K2: ClassTag: TypeInformation,
+ T: ClassTag: TypeInformation]
+ ( blocksA: BlockifiedDrmDataSet[K1], blocksB: BlockifiedDrmDataSet[K2], blockFunc:
+ ((Array[K1], Matrix), (Array[K2], Matrix)) =>
+ (Array[K1], Array[Int], Matrix) ): DataSet[(Int, (Array[K1], Array[Int], Matrix))] = {
+
+ implicit val typeInformationA = createTypeInformation[(Int, Array[K1], Matrix)]
+ implicit val typeInformationProd = createTypeInformation[(Int, (Array[K1], Array[Int], Matrix))]
+
+ // We will be joining blocks in B to blocks in A using A-partition as a key.
+
+ // Prepare A side.
+ val blocksAKeyed = blocksA.mapPartition( new RichMapPartitionFunction[(Array[K1], Matrix),
+ (Int, Array[K1], Matrix)] {
+ // partition number
+ var part: Int = 0
+
+ // get the index of the partition
+ override def open(params: Configuration): Unit = {
+ part = getRuntimeContext.getIndexOfThisSubtask
+ }
+
+ // bind the partition number to each keySet/block
+ def mapPartition(values: java.lang.Iterable[(Array[K1], Matrix)], out: Collector[(Int, Array[K1], Matrix)]): Unit = {
+
+ val blockIter = values.iterator()
+ if (blockIter.hasNext()) {
+ val r = part -> blockIter.next
+ require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'")
+ out.collect((r._1, r._2._1, r._2._2))
+ }
+ }
+ })
+
+ implicit val typeInformationB = createTypeInformation[(Int, (Array[K2], Matrix))]
+
+ // Prepare B-side.
+// val blocksBKeyed = blocksB.flatMap(bTuple => for (blockKey <- (0 until aParts).view) yield blockKey -> bTuple )
+
+ val blocksBKeyed = blocksB.mapPartition( new RichMapPartitionFunction[(Array[K2], Matrix),
+ (Int, Array[K2], Matrix)] {
+ // partition number
+ var part: Int = 0
+
+ // get the index of the partition
+ override def open(params: Configuration): Unit = {
+ part = getRuntimeContext.getIndexOfThisSubtask
+ }
+
+ // bind the partition number to each keySet/block
+ def mapPartition(values: java.lang.Iterable[(Array[K2], Matrix)], out: Collector[(Int, Array[K2], Matrix)]): Unit = {
+
+ val blockIter = values.iterator()
+ if (blockIter.hasNext()) {
+ val r = part -> blockIter.next
+ require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'")
+ out.collect((r._1, r._2._1, r._2._2))
+ }
+ }
+ })
+
+
+ implicit val typeInformationJ = createTypeInformation[(Int, ((Array[K1], Matrix),(Int, (Array[K2], Matrix))))]
+ implicit val typeInformationJprod = createTypeInformation[(Int, T)]
+
+
+ // Perform the inner join.
+ val joined = blocksAKeyed.join(blocksBKeyed).where(0).equalTo(0)
+
+ // Apply product function which should produce smaller products. Hopefully, this streams blockB's in
+ val mapped = joined.rebalance().map{tuple => tuple._1._1 ->
--- End diff --
@smarthi Not sure, I was testing a theory with that. I should take that out.
Post by Alexey Grigorev (JIRA)Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)