@Dmitriy Sir
I have completed the Kmeans code as per the algorithm you have Outline above
My code is as follows
This code works fine till step number 10
In step 11 i am assigning the new centriod index to corresponding row key
of data Point in the matrix
I think i am doing something wrong in step 11 may be i am using incorrect
syntax
Can you help me find out what am i doing wrong.
//start of main method
def main(args: Array[String]) {
//1. initialize the spark and mahout context
val conf = new SparkConf()
.setAppName("DRMExample")
.setMaster(args(0))
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator",
"org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
implicit val sc = new SparkDistributedContext(new SparkContext(conf))
//2. read the data file and save it in the rdd
val lines = sc.textFile(args(1))
//3. convert data read in as string in to array of double
val test = lines.map(line => line.split('\t').map(_.toDouble))
//4. add a column having value 1 in array of double this will
create something like (1 | D)', which will be used while calculating
(1 | D)'
val augumentedArray = test.map(addCentriodColumn _)
//5. convert rdd of array of double in rdd of DenseVector
val rdd = augumentedArray.map(dvec(_))
//6. convert rdd to DrmRdd
val rddMatrixLike: DrmRdd[Int] = rdd.zipWithIndex.map { case (v,
idx) => (idx.toInt, v) } //7. convert DrmRdd to
CheckpointedDrm[Int] val matrix = drmWrap(rddMatrixLike) //8.
seperating the column having all ones created in step 4 and will use
it later val oneVector = matrix(::, 0 until 1) //9. final
input data in DrmLike[Int] format val dataDrmX = matrix(::, 1 until
4) //9. Sampling to select initial centriods val
centriods = drmSampleKRows(dataDrmX, 2, false) centriods.size
//10. Broad Casting the initial centriods val broadCastMatrix =
drmBroadcast(centriods) //11. Iterating over the Data
Matrix(in DrmLike[Int] format) to calculate the initial centriods
dataDrmX.mapBlock() { case (keys, block) => for (row <- 0
until block.nrow) { var dataPoint = block(row, ::)
//12. findTheClosestCentriod find the closest centriod to the
Data point specified by "dataPoint" val closesetIndex =
findTheClosestCentriod(dataPoint, centriods) //13.
assigning closest index to key keys(row) = closesetIndex
} keys -> block }
//14. Calculating the (1|D) val b = (oneVector cbind
dataDrmX) //15. Aggregating Transpose (1|D)' val bTranspose
= (oneVector cbind dataDrmX).t // after step 15 bTranspose will
have data in the following format /*(n+1)*K where n=dimension
of the data point, K=number of clusters * zeroth row will contain
the count of points assigned to each cluster * assuming 3d data
points * */
val nrows = b.nrow.toInt //16. slicing the count vectors out
val pointCountVectors = drmBroadcast(b(0 until 1, ::).collect(0, ::))
val vectorSums = b(1 until nrows, ::) //17. dividing the data
point by count vector vectorSums.mapBlock() { case (keys,
block) => for (row <- 0 until block.nrow) { block(row,
::) /= pointCountVectors } keys -> block } //18.
seperating the count vectors val newCentriods = vectorSums.t(::,1
until centriods.size) //19. iterate over the above code
till convergence criteria is meet }//end of main method
// method to find the closest centriod to data point( vec: Vector
in the arguments) def findTheClosestCentriod(vec: Vector, matrix:
Matrix): Int = {
var index = 0
var closest = Double.PositiveInfinity
for (row <- 0 until matrix.nrow) {
val squaredSum = ssr(vec, matrix(row, ::))
val tempDist = Math.sqrt(ssr(vec, matrix(row, ::)))
if (tempDist < closest) {
closest = tempDist
index = row
}
}
index
}
//calculating the sum of squared distance between the points(Vectors)
def ssr(a: Vector, b: Vector): Double = {
(a - b) ^= 2 sum
}
//method used to create (1|D)
def addCentriodColumn(arg: Array[Double]): Array[Double] = {
val newArr = new Array[Double](arg.length + 1)
newArr(0) = 1.0;
for (i <- 0 until (arg.size)) {
newArr(i + 1) = arg(i);
}
newArr
}
Thanks & Regards
Parth Khatwani
On Mon, Apr 3, 2017 at 7:37 PM, KHATWANI PARTH BHARAT <
---------- Forwarded message ----------
Date: Fri, Mar 31, 2017 at 11:34 PM
Subject: Re: Trying to write the KMeans Clustering Using "Apache Mahout
Samsara"
ps1 this assumes row-wise construction of A based on training set of m
n-dimensional points.
ps2 since we are doing multiple passes over A it may make sense to make
sure it is committed to spark cache (by using checkpoint api), if spark is
used
Post by Dmitriy Lyubimovhere is the outline. For details of APIs, please refer to samsara manual
[2], i will not be be repeating it.
Assume your training data input is m x n matrix A. For simplicity let's
assume it's a DRM with int row keys, i.e., DrmLike[Int].
First, classic k-means starts by selecting initial clusters, by sampling
them out. You can do that by using sampling api [1], thus forming a k x n
in-memory matrix C (current centroids). C is therefore of Mahout's Matrix
type.
You the proceed by alternating between cluster assignments and
recompupting centroid matrix C till convergence based on some test or
simply limited by epoch count budget, your choice.
Cluster assignments: here, we go over current generation of A and
recompute centroid indexes for each row in A. Once we recompute index, we
put it into the row key . You can do that by assigning centroid indices
to
Post by Dmitriy Lyubimovkeys of A using operator mapblock() (details in [2], [3], [4]). You also
need to broadcast C in order to be able to access it in efficient manner
inside mapblock() closure. Examples of that are plenty given in [2].
Essentially, in mapblock, you'd reform the row keys to reflect cluster
index in C. while going over A, you'd have a "nearest neighbor" problem
to
Post by Dmitriy Lyubimovsolve for the row of A and centroids C. This is the bulk of computation
really, and there are a few tricks there that can speed this step up in
both exact and approximate manner, but you can start with a naive search.
once you assigned centroids to the keys of marix A, you'd want to do an
aggregating transpose of A to compute essentially average of row A
grouped
Post by Dmitriy Lyubimovby the centroid key. The trick is to do a computation of (1|A)' which
will
Post by Dmitriy Lyubimovresults in a matrix of the shape (Counts/sums of cluster rows). This is
the
Post by Dmitriy Lyubimovpart i find difficult to explain without a latex graphics.
In Samsara, construction of (1|A)' corresponds to DRM expression
(1 cbind A).t (again, see [2]).
So when you compute, say,
B = (1 | A)',
then B is (n+1) x k, so each column contains a vector corresponding to a
cluster 1..k. In such column, the first element would be # of points in
the
Post by Dmitriy Lyubimovcluster, and the rest of it would correspond to sum of all points. So in
order to arrive to an updated matrix C, we need to collect B into memory,
and slice out counters (first row) from the rest of it.
C <- B (2:,:) each row divided by B(1,:)
(watch out for empty clusters with 0 elements, this will cause lack of
convergence and NaNs in the newly computed C).
This operation obviously uses subblocking and row-wise iteration over B,
for which i am again making reference to [2].
[1] https://github.com/apache/mahout/blob/master/math-scala/
src/main/scala/org/apache/mahout/math/drm/package.scala#L149
[2], Sasmara manual, a bit dated but viable, http://apache.github.
io/mahout/doc/ScalaSparkBindings.html
[3] scaladoc, again, dated but largely viable for the purpose of this
http://apache.github.io/mahout/0.10.1/docs/mahout-math-scala/index.htm
[4] mapblock etc. http://apache.github.io/mahout/0.10.1/docs/mahout-
math-scala/index.html#org.apache.mahout.math.drm.RLikeDrmOps
On Fri, Mar 31, 2017 at 9:54 AM, KHATWANI PARTH BHARAT <
Post by KHATWANI PARTH BHARAT@Dmitriycan you please again tell me the approach to move ahead.
Thanks
Parth Khatwani
On Fri, Mar 31, 2017 at 10:15 PM, KHATWANI PARTH BHARAT <
Post by KHATWANI PARTH BHARATyes i am unable to figure out the way ahead.
Like how to create the augmented matrix A := (0|D) which you have mentioned.
Post by Dmitriy LyubimovOn Fri, Mar 31, 2017 at 8:40 AM, KHATWANI PARTH BHARAT <
Post by KHATWANI PARTH BHARATSir,
I am trying to write the kmeans clustering algorithm using Mahout
Samsara
Post by KHATWANI PARTH BHARATbut i am bit confused
about how to leverage Distributed Row Matrix for the same. Can
anybody