Discussion:
[jira] [Created] (MAHOUT-1750) Mahout DSL for Flink: Implement ABt
Alexey Grigorev (JIRA)
2015-06-24 11:07:04 UTC
Permalink
Alexey Grigorev created MAHOUT-1750:
---------------------------------------

Summary: Mahout DSL for Flink: Implement ABt
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Priority: Minor


Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2015-10-23 22:25:27 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dmitriy Lyubimov updated MAHOUT-1750:
-------------------------------------
Sprint: Sep/Oct-2015
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2015-10-23 22:26:27 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dmitriy Lyubimov updated MAHOUT-1750:
-------------------------------------
Assignee: Alexey Grigorev
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Alexey Grigorev
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Suneel Marthi (JIRA)
2015-10-23 22:43:27 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Suneel Marthi updated MAHOUT-1750:
----------------------------------
Fix Version/s: (was: 0.12.0)
0.13.0
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Alexey Grigorev
Priority: Minor
Fix For: 0.13.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2015-11-14 23:56:11 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dmitriy Lyubimov updated MAHOUT-1750:
-------------------------------------
Sprint: Nov/Dec-2015 (was: Sep/Oct-2015)
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Suneel Marthi
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-03-23 22:22:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo reassigned MAHOUT-1750:
--------------------------------------

Assignee: Andrew Palumbo (was: Suneel Marthi)
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-03-29 02:38:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Work on MAHOUT-1750 started by Andrew Palumbo.
----------------------------------------------
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Saikat Kanjilal (JIRA)
2016-03-30 15:37:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15218169#comment-15218169 ]

Saikat Kanjilal commented on MAHOUT-1750:
-----------------------------------------

Andrew,
I'd like to dive into Mahout a bit, can I help with this issue?
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2016-03-30 17:18:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dmitriy Lyubimov updated MAHOUT-1750:
-------------------------------------
Sprint: Jan/Feb-2016 (was: Nov/Dec-2015)
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2016-03-30 17:25:26 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dmitriy Lyubimov updated MAHOUT-1750:
-------------------------------------
Sprint: Mar/Apr-2016 (was: Jan/Feb-2016)
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Dmitriy Lyubimov (JIRA)
2016-03-30 17:26:26 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dmitriy Lyubimov updated MAHOUT-1750:
-------------------------------------
Sprint: Jan/Feb-2016 (was: Mar/Apr-2016)
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-04-08 08:25:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15231856#comment-15231856 ]

Andrew Palumbo commented on MAHOUT-1750:
----------------------------------------

tracking the problems with what should be a rather straightforward implementation here:

https://gist.github.com/andrewpalumbo/c42d41074410752a8712446dcd1f86dc
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Suneel Marthi (JIRA)
2016-04-08 08:30:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Suneel Marthi updated MAHOUT-1750:
----------------------------------
Comment: was deleted

(was: Andrew,
I'd like to dive into Mahout a bit, can I help with this issue?)
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-04-08 08:47:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15231856#comment-15231856 ]

Andrew Palumbo edited comment on MAHOUT-1750 at 4/8/16 8:46 AM:
----------------------------------------------------------------

tracking the problems with what should be a rather straightforward implementation here:

https://gist.github.com/andrewpalumbo/c42d41074410752a8712446dcd1f86dc

Tracking the development here: https://github.com/andrewpalumbo/mahout/blob/MAHOUT-1750/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpABt.scala


was (Author: andrew_palumbo):
tracking the problems with what should be a rather straightforward implementation here:

https://gist.github.com/andrewpalumbo/c42d41074410752a8712446dcd1f86dc
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-04-08 16:58:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15232484#comment-15232484 ]

Andrew Palumbo commented on MAHOUT-1750:
----------------------------------------
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
ASF GitHub Bot (JIRA)
2016-04-10 01:09:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15233804#comment-15233804 ]

ASF GitHub Bot commented on MAHOUT-1750:
----------------------------------------

GitHub user andrewpalumbo opened a pull request:

https://github.com/apache/mahout/pull/215

MAHOUT-1750 - For Review. FlinkOpABt too many operations to implement?

When (partially) Implementing `FlinkOpABt` so far, It seems that there may be too many Flink map/reduce/group/etc operations to finish using this method. **NOTE**: this is unfinished and is not numerically correct.

Currently, when testing am getting kyro stack overflow exceptions, which as I understand are often caused by a long string of operations:
```
Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:716)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.StackOverflowError
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:74)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
{...}```

Any comments are appreciated.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andrewpalumbo/mahout MAHOUT-1750

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/mahout/pull/215.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #215

----
commit fab6bb86fa1af0a7efce2300dcada45c32b35677
Author: Andrew Palumbo <***@apache.org>
Date: 2016-03-28T01:34:34Z

start Stealing DL's Spark code

commit 7e1a8bbaaca71036e9c0247f07a40368432f592b
Author: Andrew Palumbo <***@apache.org>
Date: 2016-03-28T01:55:01Z

wip: Spark combineByKey => Flink groupReduce

commit 31a644b36ed92c5aae3b80d5e8fbca7dd8a1d87a
Author: Andrew Palumbo <***@apache.org>
Date: 2016-03-28T22:38:31Z

implement first step of the combiner

commit 7dc73890ac5eeaad1ba38ab799b19b989c6ff918
Author: Andrew Palumbo <***@apache.org>
Date: 2016-03-28T22:53:09Z

Add in type information for maps, etc.

commit 5c60167f92a32b4c0751f6bf8dc12a838e1cde78
Author: Andrew Palumbo <***@apache.org>
Date: 2016-03-28T23:50:12Z

Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1750

commit 9b01a4a498c080e6063387608253890a13e77ad4
Author: Andrew Palumbo <***@apache.org>
Date: 2016-03-29T02:48:43Z

remove all unnecessary type info still not builing with GroupCombineFunctions

commit 37391a5a51b6803fdf893349a45a61903a6928e5
Author: Andrew Palumbo <***@apache.org>
Date: 2016-03-30T00:07:26Z

wip

commit d65e23b91f3c055c80de432dc9699c62512dd34e
Author: Andrew Palumbo <***@apache.org>
Date: 2016-03-30T00:07:40Z

Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1750

commit 3127d3218d9fa0fba35c9c240db63e71b00c8a32
Author: Andrew Palumbo <***@apache.org>
Date: 2016-03-30T01:01:15Z

wip: still problems with error: object creation impossible, since method mapPartition in class RichMapPartitionFunction of type (x: Iterable[(Array[K1], org.apache.mahout.math.Matrix)], x: org.apache.flink.util.Collector[(Int, Array[K1], org.apache.mahout.math.Matrix)])Unit is not defined

commit c19493b058e6d92f10fadeb32be9bafb14e7c671
Author: Andrew Palumbo <***@apache.org>
Date: 2016-04-08T05:17:03Z

WIP

commit b9ed381c2178392e785c4835adfac53b8d0ec5fb
Author: Andrew Palumbo <***@apache.org>
Date: 2016-04-08T08:41:58Z

Revert "WIP"

This reverts commit c19493b058e6d92f10fadeb32be9bafb14e7c671.

commit 3311e89fbef3751476dbf113f680330b61f64160
Author: Andrew Palumbo <***@apache.org>
Date: 2016-04-08T08:51:20Z

Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1750

commit db21e7b12a0b4e05ff27fa9031fd8bf3e5f22bda
Author: Andrew Palumbo <***@apache.org>
Date: 2016-04-08T17:02:05Z

Use java.lang.Iterable instead of scala.Iterable- this fixes the object creation issue.

commit d3d60fadf50ceb12f559d4c94c9402735c8a7d30
Author: Andrew Palumbo <***@apache.org>
Date: 2016-04-08T21:45:03Z

wip

commit 0a83e122c82eb0584a00510f7fb571432e834064
Author: Andrew Palumbo <***@apache.org>
Date: 2016-04-08T22:11:26Z

wip: error is

Error:(180, 23) overriding method reduce in trait ReduceFunction of type (x: (Array[K], org.apache.mahout.math.Matrix), x: (Array[K], org.apache.mahout.math.Matrix))(Array[K], org.apache.mahout.math.Matrix);
method reduce has incompatible type
def reduce(mx1: (Array[K], Matrix), mx2: (Array[K], Matrix)) {
^

commit 97a6aea7c4fd0abc719605991322ac4acb758ea8
Author: Andrew Palumbo <***@apache.org>
Date: 2016-04-08T22:20:56Z

wip: compiling now

commit 088d925ba3cc5e93e25ad37f8e2fcf4d4698595f
Author: Andrew Palumbo <***@apache.org>
Date: 2016-04-09T00:44:22Z

not returning any partitions. failing at dimensionality count in CheckpointedDrmFlink

commit 82ae321b85b84e478152f208059f9c8146d814c8
Author: Andrew Palumbo <***@apache.org>
Date: 2016-04-09T01:01:05Z

wip: need to make combiner into a merger

commit 3c7c2ff937206b0882f6395058a8646ed1c0cdaa
Author: Andrew Palumbo <***@apache.org>
Date: 2016-04-09T14:29:38Z

wip: comments, combiner

commit cd2b59ac96b63c93ae18619cdf22b6c2742e93d3
Author: Andrew Palumbo <***@apache.org>
Date: 2016-04-09T19:58:29Z

wip: kryo error

commit d313ebc1618ae5cb44b5161b492bb03baba8216d
Author: Andrew Palumbo <***@apache.org>
Date: 2016-04-10T00:58:13Z

Kryo stackOverflow

----
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
ASF GitHub Bot (JIRA)
2016-04-10 13:00:27 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15234108#comment-15234108 ]

ASF GitHub Bot commented on MAHOUT-1750:
----------------------------------------

Github user rmetzger commented on the pull request:

https://github.com/apache/mahout/pull/215#issuecomment-207982475

How can I reproduce the issue?
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
ASF GitHub Bot (JIRA)
2016-04-10 13:15:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15234115#comment-15234115 ]

ASF GitHub Bot commented on MAHOUT-1750:
----------------------------------------

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/mahout/pull/215#discussion_r59133538

--- Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpABt.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.flinkbindings.blas
+
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.util.Collector
+import org.apache.mahout.logging._
+import org.apache.mahout.math.drm.{BlockifiedDrmTuple, DrmTuple}
+import org.apache.mahout.math.drm.logical.{OpAB, OpABt}
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.{Matrix, SparseMatrix, SparseRowMatrix}
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.flinkbindings.drm._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
+import org.apache.mahout.math.{Matrix, Vector}
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical.OpABt
+import org.apache.mahout.math.scalabindings.RLikeOps._
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+import org.apache.flink.api.scala.createTypeInformation
+
+/** Contains DataSet plans for ABt operator */
+object FlinkOpABt {
+
+ private final implicit val log = getLog(FlinkOpABt.getClass)
+
+ /**
+ * General entry point for AB' operator.
+ *
+ * @param operator the AB' operator
+ * @param srcA A source DataSet
+ * @param srcB B source DataSet
+ * @tparam K
+ */
+ def abt[K: ClassTag: TypeInformation](
+ operator: OpABt[K],
+ srcA: FlinkDrm[K],
+ srcB: FlinkDrm[Int]): FlinkDrm[K] = {
+
+ debug("operator AB'(Flink)")
+ abt_nograph[K](operator, srcA, srcB)
+ }
+
+ /**
+ * Computes AB'
+ *
+ * General idea here is that we split both A and B vertically into blocks (one block per split),
+ * then compute cartesian join of the blocks of both data sets. This creates tuples of the form of
+ * (A-block, B-block). We enumerate A-blocks and transform this into (A-block-id, A-block, B-block)
+ * and then compute A-block %*% B-block', thus producing tuples (A-block-id, AB'-block).
+ *
+ * The next step is to group the above tuples by A-block-id and stitch al AB'-blocks in the group
+ * horizontally, forming single vertical block of the final product AB'.
+ *
+ * This logic is complicated a little by the fact that we have to keep block row and column keys
+ * so that the stitching of AB'-blocks happens according to integer row indices of the B input.
+ */
+ private[flinkbindings] def abt_nograph[K: ClassTag: TypeInformation](
+ operator: OpABt[K],
+ srcA: FlinkDrm[K],
+ srcB: FlinkDrm[Int]): FlinkDrm[K] = {
+
+ // Blockify everything.
+ val blocksA = srcA.asBlockified
+ val blocksB = srcB.asBlockified
+
+ val prodNCol = operator.ncol
+ val prodNRow = operator.nrow
+
+
+ // blockwise multiplication function
+ def mmulFunc(tupleA: (Array[K], Matrix), tupleB: (Array[Int], Matrix)): (Array[K], Array[Int], Matrix) = {
+ val (keysA, blockA) = tupleA
+ val (keysB, blockB) = tupleB
+
+ var ms = traceDo(System.currentTimeMillis())
+
+ // We need to send keysB to the aggregator in order to know which columns are being updated.
+ val result = (keysA, keysB, blockA %*% blockB.t)
+
+ ms = traceDo(System.currentTimeMillis() - ms.get)
+ trace(
+ s"block multiplication of(${blockA.nrow}x${blockA.ncol} x ${blockB.ncol}x${blockB.nrow} is completed in $ms " +
+ "ms.")
+ trace(s"block multiplication types: blockA: ${blockA.getClass.getName}(${blockA.t.getClass.getName}); " +
+ s"blockB: ${blockB.getClass.getName}.")
+
+ result.asInstanceOf[(Array[K], Array[Int], Matrix)]
+ }
+
+
+ implicit val typeInformation = createTypeInformation[(Array[K], Matrix)]
+ implicit val typeInformation2 = createTypeInformation[(Int, (Array[K], Array[Int], Matrix))]
+ implicit val typeInformation3 = createTypeInformation[(Array[K], Array[Int], Matrix)]
+
+ val blockwiseMmulDataSet =
+
+ // Combine blocks pairwise.
+ pairwiseApply(blocksA.asBlockified.ds, blocksB.asBlockified.ds, mmulFunc)
+
+ // Now reduce proper product blocks.
+ // group by the partition key
+ .groupBy(0)
+
+ // combine as transpose
+ .combineGroup(new GroupCombineFunction[(Int, (Array[K], Array[Int], Matrix)), (Array[K], Matrix)] {
+
+ def combine(values: java.lang.Iterable[(Int, (Array[K], Array[Int], Matrix))],
+ out: Collector[(Array[K], Matrix)]): Unit = {
+ val tuple = values.iterator().next
+ val rowKeys = tuple._2._1
+ val colKeys = tuple._2._2
+ val block = tuple._2._3
+
+ val comb = new SparseMatrix(prodNCol, block.nrow).t
+ for ((col, i) <- colKeys.zipWithIndex) comb(::, col) := block(::, i)
+ val res = (rowKeys, comb)
+
+ out.collect(res)
+ }
+ })
+
+ // reduce into a final Blockified matrix
+ .reduce(new ReduceFunction[(Array[K], Matrix)] {
+
+ def reduce(mx1: (Array[K], Matrix), mx2: (Array[K], Matrix)): (Array[K], Matrix) = {
+ mx1._2 += mx2._2
+ mx1
+ }
+ })
+
+
+ new BlockifiedFlinkDrm(ds = blockwiseMmulDataSet, ncol = prodNCol)
+
+ }
+ /**
+ * This function tries to use join instead of cartesian to group blocks together without bloating
+ * the number of partitions. Hope is that we can apply pairwise reduction of block pair right away
+ * so if the data to one of the join parts is streaming, the result is still fitting to memory,
+ * since result size is much smaller than the operands.
+ *
+ * @param blocksA blockified DataSet for A
+ * @param blocksB blockified DataSet for B
+ * @param blockFunc a function over (blockA, blockB). Implies `blockA %*% blockB.t` but perhaps may be
+ * switched to another scheme based on which of the sides, A or B, is bigger.
+ */
+ private def pairwiseApply[K1: ClassTag: TypeInformation, K2: ClassTag: TypeInformation,
+ T: ClassTag: TypeInformation]
+ ( blocksA: BlockifiedDrmDataSet[K1], blocksB: BlockifiedDrmDataSet[K2], blockFunc:
+ ((Array[K1], Matrix), (Array[K2], Matrix)) =>
+ (Array[K1], Array[Int], Matrix) ): DataSet[(Int, (Array[K1], Array[Int], Matrix))] = {
+
+ implicit val typeInformationA = createTypeInformation[(Int, Array[K1], Matrix)]
+ implicit val typeInformationProd = createTypeInformation[(Int, (Array[K1], Array[Int], Matrix))]
+
+ // We will be joining blocks in B to blocks in A using A-partition as a key.
+
+ // Prepare A side.
+ val blocksAKeyed = blocksA.mapPartition( new RichMapPartitionFunction[(Array[K1], Matrix),
+ (Int, Array[K1], Matrix)] {
+ // partition number
+ var part: Int = 0
+
+ // get the index of the partition
+ override def open(params: Configuration): Unit = {
+ part = getRuntimeContext.getIndexOfThisSubtask
+ }
+
+ // bind the partition number to each keySet/block
+ def mapPartition(values: java.lang.Iterable[(Array[K1], Matrix)], out: Collector[(Int, Array[K1], Matrix)]): Unit = {
+
+ val blockIter = values.iterator()
+ if (blockIter.hasNext()) {
+ val r = part -> blockIter.next
+ require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'")
+ out.collect((r._1, r._2._1, r._2._2))
+ }
+ }
+ })
+
+ implicit val typeInformationB = createTypeInformation[(Int, (Array[K2], Matrix))]
+
+ // Prepare B-side.
+// val blocksBKeyed = blocksB.flatMap(bTuple => for (blockKey <- (0 until aParts).view) yield blockKey -> bTuple )
+
+ val blocksBKeyed = blocksB.mapPartition( new RichMapPartitionFunction[(Array[K2], Matrix),
+ (Int, Array[K2], Matrix)] {
+ // partition number
+ var part: Int = 0
+
+ // get the index of the partition
+ override def open(params: Configuration): Unit = {
+ part = getRuntimeContext.getIndexOfThisSubtask
+ }
+
+ // bind the partition number to each keySet/block
+ def mapPartition(values: java.lang.Iterable[(Array[K2], Matrix)], out: Collector[(Int, Array[K2], Matrix)]): Unit = {
+
+ val blockIter = values.iterator()
+ if (blockIter.hasNext()) {
+ val r = part -> blockIter.next
+ require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'")
+ out.collect((r._1, r._2._1, r._2._2))
+ }
+ }
+ })
+
+
+ implicit val typeInformationJ = createTypeInformation[(Int, ((Array[K1], Matrix),(Int, (Array[K2], Matrix))))]
+ implicit val typeInformationJprod = createTypeInformation[(Int, T)]
+
+
+ // Perform the inner join.
+ val joined = blocksAKeyed.join(blocksBKeyed).where(0).equalTo(0)
+
+ // Apply product function which should produce smaller products. Hopefully, this streams blockB's in
+ val mapped = joined.rebalance().map{tuple => tuple._1._1 ->
--- End diff --

u think rebalance() here is actually eliminating empty partitions ?
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
ASF GitHub Bot (JIRA)
2016-04-10 16:50:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15234189#comment-15234189 ]

ASF GitHub Bot commented on MAHOUT-1750:
----------------------------------------

Github user andrewpalumbo commented on a diff in the pull request:

https://github.com/apache/mahout/pull/215#discussion_r59137058

--- Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpABt.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.flinkbindings.blas
+
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.util.Collector
+import org.apache.mahout.logging._
+import org.apache.mahout.math.drm.{BlockifiedDrmTuple, DrmTuple}
+import org.apache.mahout.math.drm.logical.{OpAB, OpABt}
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.{Matrix, SparseMatrix, SparseRowMatrix}
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.flinkbindings.drm._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
+import org.apache.mahout.math.{Matrix, Vector}
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical.OpABt
+import org.apache.mahout.math.scalabindings.RLikeOps._
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+import org.apache.flink.api.scala.createTypeInformation
+
+/** Contains DataSet plans for ABt operator */
+object FlinkOpABt {
+
+ private final implicit val log = getLog(FlinkOpABt.getClass)
+
+ /**
+ * General entry point for AB' operator.
+ *
+ * @param operator the AB' operator
+ * @param srcA A source DataSet
+ * @param srcB B source DataSet
+ * @tparam K
+ */
+ def abt[K: ClassTag: TypeInformation](
+ operator: OpABt[K],
+ srcA: FlinkDrm[K],
+ srcB: FlinkDrm[Int]): FlinkDrm[K] = {
+
+ debug("operator AB'(Flink)")
+ abt_nograph[K](operator, srcA, srcB)
+ }
+
+ /**
+ * Computes AB'
+ *
+ * General idea here is that we split both A and B vertically into blocks (one block per split),
+ * then compute cartesian join of the blocks of both data sets. This creates tuples of the form of
+ * (A-block, B-block). We enumerate A-blocks and transform this into (A-block-id, A-block, B-block)
+ * and then compute A-block %*% B-block', thus producing tuples (A-block-id, AB'-block).
+ *
+ * The next step is to group the above tuples by A-block-id and stitch al AB'-blocks in the group
+ * horizontally, forming single vertical block of the final product AB'.
+ *
+ * This logic is complicated a little by the fact that we have to keep block row and column keys
+ * so that the stitching of AB'-blocks happens according to integer row indices of the B input.
+ */
+ private[flinkbindings] def abt_nograph[K: ClassTag: TypeInformation](
+ operator: OpABt[K],
+ srcA: FlinkDrm[K],
+ srcB: FlinkDrm[Int]): FlinkDrm[K] = {
+
+ // Blockify everything.
+ val blocksA = srcA.asBlockified
+ val blocksB = srcB.asBlockified
+
+ val prodNCol = operator.ncol
+ val prodNRow = operator.nrow
+
+
+ // blockwise multiplication function
+ def mmulFunc(tupleA: (Array[K], Matrix), tupleB: (Array[Int], Matrix)): (Array[K], Array[Int], Matrix) = {
+ val (keysA, blockA) = tupleA
+ val (keysB, blockB) = tupleB
+
+ var ms = traceDo(System.currentTimeMillis())
+
+ // We need to send keysB to the aggregator in order to know which columns are being updated.
+ val result = (keysA, keysB, blockA %*% blockB.t)
+
+ ms = traceDo(System.currentTimeMillis() - ms.get)
+ trace(
+ s"block multiplication of(${blockA.nrow}x${blockA.ncol} x ${blockB.ncol}x${blockB.nrow} is completed in $ms " +
+ "ms.")
+ trace(s"block multiplication types: blockA: ${blockA.getClass.getName}(${blockA.t.getClass.getName}); " +
+ s"blockB: ${blockB.getClass.getName}.")
+
+ result.asInstanceOf[(Array[K], Array[Int], Matrix)]
+ }
+
+
+ implicit val typeInformation = createTypeInformation[(Array[K], Matrix)]
+ implicit val typeInformation2 = createTypeInformation[(Int, (Array[K], Array[Int], Matrix))]
+ implicit val typeInformation3 = createTypeInformation[(Array[K], Array[Int], Matrix)]
+
+ val blockwiseMmulDataSet =
+
+ // Combine blocks pairwise.
+ pairwiseApply(blocksA.asBlockified.ds, blocksB.asBlockified.ds, mmulFunc)
+
+ // Now reduce proper product blocks.
+ // group by the partition key
+ .groupBy(0)
+
+ // combine as transpose
+ .combineGroup(new GroupCombineFunction[(Int, (Array[K], Array[Int], Matrix)), (Array[K], Matrix)] {
+
+ def combine(values: java.lang.Iterable[(Int, (Array[K], Array[Int], Matrix))],
+ out: Collector[(Array[K], Matrix)]): Unit = {
+ val tuple = values.iterator().next
+ val rowKeys = tuple._2._1
+ val colKeys = tuple._2._2
+ val block = tuple._2._3
+
+ val comb = new SparseMatrix(prodNCol, block.nrow).t
+ for ((col, i) <- colKeys.zipWithIndex) comb(::, col) := block(::, i)
+ val res = (rowKeys, comb)
+
+ out.collect(res)
+ }
+ })
+
+ // reduce into a final Blockified matrix
+ .reduce(new ReduceFunction[(Array[K], Matrix)] {
+
+ def reduce(mx1: (Array[K], Matrix), mx2: (Array[K], Matrix)): (Array[K], Matrix) = {
+ mx1._2 += mx2._2
+ mx1
+ }
+ })
+
+
+ new BlockifiedFlinkDrm(ds = blockwiseMmulDataSet, ncol = prodNCol)
+
+ }
+ /**
+ * This function tries to use join instead of cartesian to group blocks together without bloating
+ * the number of partitions. Hope is that we can apply pairwise reduction of block pair right away
+ * so if the data to one of the join parts is streaming, the result is still fitting to memory,
+ * since result size is much smaller than the operands.
+ *
+ * @param blocksA blockified DataSet for A
+ * @param blocksB blockified DataSet for B
+ * @param blockFunc a function over (blockA, blockB). Implies `blockA %*% blockB.t` but perhaps may be
+ * switched to another scheme based on which of the sides, A or B, is bigger.
+ */
+ private def pairwiseApply[K1: ClassTag: TypeInformation, K2: ClassTag: TypeInformation,
+ T: ClassTag: TypeInformation]
+ ( blocksA: BlockifiedDrmDataSet[K1], blocksB: BlockifiedDrmDataSet[K2], blockFunc:
+ ((Array[K1], Matrix), (Array[K2], Matrix)) =>
+ (Array[K1], Array[Int], Matrix) ): DataSet[(Int, (Array[K1], Array[Int], Matrix))] = {
+
+ implicit val typeInformationA = createTypeInformation[(Int, Array[K1], Matrix)]
+ implicit val typeInformationProd = createTypeInformation[(Int, (Array[K1], Array[Int], Matrix))]
+
+ // We will be joining blocks in B to blocks in A using A-partition as a key.
+
+ // Prepare A side.
+ val blocksAKeyed = blocksA.mapPartition( new RichMapPartitionFunction[(Array[K1], Matrix),
+ (Int, Array[K1], Matrix)] {
+ // partition number
+ var part: Int = 0
+
+ // get the index of the partition
+ override def open(params: Configuration): Unit = {
+ part = getRuntimeContext.getIndexOfThisSubtask
+ }
+
+ // bind the partition number to each keySet/block
+ def mapPartition(values: java.lang.Iterable[(Array[K1], Matrix)], out: Collector[(Int, Array[K1], Matrix)]): Unit = {
+
+ val blockIter = values.iterator()
+ if (blockIter.hasNext()) {
+ val r = part -> blockIter.next
+ require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'")
+ out.collect((r._1, r._2._1, r._2._2))
+ }
+ }
+ })
+
+ implicit val typeInformationB = createTypeInformation[(Int, (Array[K2], Matrix))]
+
+ // Prepare B-side.
+// val blocksBKeyed = blocksB.flatMap(bTuple => for (blockKey <- (0 until aParts).view) yield blockKey -> bTuple )
+
+ val blocksBKeyed = blocksB.mapPartition( new RichMapPartitionFunction[(Array[K2], Matrix),
+ (Int, Array[K2], Matrix)] {
+ // partition number
+ var part: Int = 0
+
+ // get the index of the partition
+ override def open(params: Configuration): Unit = {
+ part = getRuntimeContext.getIndexOfThisSubtask
+ }
+
+ // bind the partition number to each keySet/block
+ def mapPartition(values: java.lang.Iterable[(Array[K2], Matrix)], out: Collector[(Int, Array[K2], Matrix)]): Unit = {
+
+ val blockIter = values.iterator()
+ if (blockIter.hasNext()) {
+ val r = part -> blockIter.next
+ require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'")
+ out.collect((r._1, r._2._1, r._2._2))
+ }
+ }
+ })
+
+
+ implicit val typeInformationJ = createTypeInformation[(Int, ((Array[K1], Matrix),(Int, (Array[K2], Matrix))))]
+ implicit val typeInformationJprod = createTypeInformation[(Int, T)]
+
+
+ // Perform the inner join.
+ val joined = blocksAKeyed.join(blocksBKeyed).where(0).equalTo(0)
+
+ // Apply product function which should produce smaller products. Hopefully, this streams blockB's in
+ val mapped = joined.rebalance().map{tuple => tuple._1._1 ->
--- End diff --

@smarthi Not sure, I was testing a theory with that. I should take that out.
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
ASF GitHub Bot (JIRA)
2016-04-10 17:00:26 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15234194#comment-15234194 ]

ASF GitHub Bot commented on MAHOUT-1750:
----------------------------------------

Github user andrewpalumbo commented on the pull request:

https://github.com/apache/mahout/pull/215#issuecomment-208015484

@rmetzger To reproduce, you'd have to pull this branch, and set a `$MAHOUT_HOME` env vaiabler to point at the base directory. Then from `$MAHOUT_HOME` run `mvn clean install -DskipTests && cd flink && mvn test -Dsuites="RLikeDrmOpsSuite"`.

Or you could run the `RLikeDrmOpsSuite` suite from IntelliJ, but make sure that you set the `MAHOUT_HOME` env variable in the "Run/Debug Configurations" for the test.

Thanks!
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
ASF GitHub Bot (JIRA)
2016-04-10 17:07:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15234196#comment-15234196 ]

ASF GitHub Bot commented on MAHOUT-1750:
----------------------------------------

Github user andrewpalumbo commented on the pull request:

https://github.com/apache/mahout/pull/215#issuecomment-208017898

or rather `mvn clean install -DskipTests && cd flink && mvn test -Dsuites="*RLikeDrmOpsSuite"`
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
ASF GitHub Bot (JIRA)
2016-04-10 17:48:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15234224#comment-15234224 ]

ASF GitHub Bot commented on MAHOUT-1750:
----------------------------------------

Github user rmetzger commented on the pull request:

https://github.com/apache/mahout/pull/215#issuecomment-208029189

I think the problem is the `TransposedMatrixView` class.

See :
![recursive-mahout](Loading Image...)

And this code example

```scala
def main(args: Array[String]) {

val ser = new KryoSerializer[TransposedMatrixView](classOf[TransposedMatrixView], new ExecutionConfig())
val matrix = new SparseMatrix(15, 15)
val inst = new TransposedMatrixView(matrix)
ser.serialize(inst, new DataOutputSerializer(64))
}
```

which will reproduce the error.
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
ASF GitHub Bot (JIRA)
2016-04-10 20:53:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15234275#comment-15234275 ]

ASF GitHub Bot commented on MAHOUT-1750:
----------------------------------------

Github user andrewpalumbo commented on the pull request:

https://github.com/apache/mahout/pull/215#issuecomment-208068896

Thank you very much for looking into this, Robert. It is much appreciated. I am having trouble understanding what the issue is with this class. So you think that it is the Enumeration being returned by the `getStructure()` method in `TransposedMatrixView` that is the problem?
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-04-10 23:19:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1750:
-----------------------------------
Fix Version/s: (was: 0.12.0)
0.12.1
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Flink, Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.1
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-04-10 23:19:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1750:
-----------------------------------
Component/s: Flink
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Flink, Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.1
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Suneel Marthi (JIRA)
2016-04-18 01:05:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15245021#comment-15245021 ]

Suneel Marthi commented on MAHOUT-1750:
---------------------------------------

Given that the Flink folks acknowledge this issue to be a problem in the flink codebase and has been fixed in the pending Flink 1.0.2 release, can we revert the matrix setting back to 500 * 500 ?
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Flink, Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.1
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Suneel Marthi (JIRA)
2016-04-18 01:05:25 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15245021#comment-15245021 ]

Suneel Marthi edited comment on MAHOUT-1750 at 4/18/16 1:05 AM:
----------------------------------------------------------------

Given that the Flink folks acknowledge this issue to be a problem in the flink codebase and have a fix in the pending Flink 1.0.2 release, can we revert the matrix setting back to 500 * 500 ?


was (Author: smarthi):
Given that the Flink folks acknowledge this issue to be a problem in the flink codebase and has been fixed in the pending Flink 1.0.2 release, can we revert the matrix setting back to 500 * 500 ?
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Flink, Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.1
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
ASF GitHub Bot (JIRA)
2016-04-26 19:51:13 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15258793#comment-15258793 ]

ASF GitHub Bot commented on MAHOUT-1750:
----------------------------------------

Github user andrewpalumbo closed the pull request at:

https://github.com/apache/mahout/pull/215
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Flink, Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.1
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-04-27 21:26:12 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Work on MAHOUT-1750 stopped by Andrew Palumbo.
----------------------------------------------
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Flink, Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 0.12.1
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-05-17 19:11:13 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1750:
-----------------------------------
Fix Version/s: (was: 0.12.1)
1.0.0
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Flink, Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Assignee: Andrew Palumbo
Priority: Minor
Fix For: 1.0.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-05-17 19:11:13 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1750:
-----------------------------------
Assignee: (was: Andrew Palumbo)
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Flink, Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Priority: Minor
Fix For: 1.0.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-12-20 18:58:58 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1750:
-----------------------------------
Sprint: (was: Jan/Feb-2017)
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Flink, Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Priority: Minor
Fix For: 1.0.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Andrew Palumbo (JIRA)
2016-12-20 18:58:58 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Palumbo updated MAHOUT-1750:
-----------------------------------
Sprint: Jan/Feb-2017 (was: Jan/Feb-2016)
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Flink, Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Priority: Minor
Fix For: 1.0.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Suneel Marthi (JIRA)
2016-12-26 06:43:58 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Suneel Marthi resolved MAHOUT-1750.
-----------------------------------
Resolution: Won't Fix
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Flink, Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Priority: Minor
Fix For: 1.0.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Suneel Marthi (JIRA)
2016-12-26 06:43:58 UTC
Permalink
[ https://issues.apache.org/jira/browse/MAHOUT-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Suneel Marthi updated MAHOUT-1750:
----------------------------------
Fix Version/s: (was: 1.0.0)
0.13.0
Post by Alexey Grigorev (JIRA)
Mahout DSL for Flink: Implement ABt
-----------------------------------
Key: MAHOUT-1750
URL: https://issues.apache.org/jira/browse/MAHOUT-1750
Project: Mahout
Issue Type: Task
Components: Flink, Math
Affects Versions: 0.10.2
Reporter: Alexey Grigorev
Priority: Minor
Fix For: 0.13.0
Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Loading...