diff --git a/bin/samoa b/bin/samoa
index 1a26caa9..7579fdcb 100755
--- a/bin/samoa
+++ b/bin/samoa
@@ -205,6 +205,36 @@ elif [ $PLATFORM = 'STORM' ]; then
java -cp $CLASSPATH org.apache.samoa.LocalStormDoTask $COMPLETE_ARG $NUM_WORKER
fi
+elif [ $PLATFORM = 'GEARPUMP' ]; then
+
+ echo "Deploying to $PLATFORM"
+ if [ -z $GEARPUMP_HOME ];then
+ echo "GEARPUMP_HOME is not set!"
+ echo "Please set GEARPUMP_HOME to point to your Gearpump installation"
+ exit -1
+ fi
+
+ if [ ! -f $2 ];then
+ echo "$2 does not exist!"
+ echo "Please use a valid jar file for Gearpump execution"
+ exit -1
+ fi
+
+ GEARPUMP_EXEC="sh $GEARPUMP_HOME/bin/gear"
+
+ COMPLETE_ARG=""
+ COUNTER=0
+ for var in "$@"
+ do
+ COUNTER=`expr $COUNTER + 1`
+ if [ $COUNTER -gt 2 ];then
+ COMPLETE_ARG="$COMPLETE_ARG $var"
+ fi
+ done
+
+ DEPLOYABLE=$JAR_PATH
+ $GEARPUMP_EXEC app -jar $DEPLOYABLE org.apache.samoa.topology.impl.gearpump.DoTask $COMPLETE_ARG
+
elif [ $PLATFORM = 'SAMZA' ]; then
echo "Deploying to SAMZA"
diff --git a/pom.xml b/pom.xml
index 33049375..9e039c04 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,15 @@
samoa-test
+
+ gearpump
+
+ samoa-instances
+ samoa-api
+ samoa-gearpump
+ samoa-test
+
+
all
@@ -103,6 +112,7 @@
samoa-storm
samoa-flink
samoa-samza
+ samoa-gearpump
samoa-test
@@ -127,6 +137,9 @@
1.0.3
0.7.0
0.10.1
+ 0.8.1
+ 2.11
+ 2.11.5
1.7.2
1.7.5
2.18
@@ -210,6 +223,7 @@
samoa-storm
samoa-flink
samoa-samza
+ samoa-gearpump
samoa-test
samoa-threads
bin
diff --git a/samoa-gearpump/README.md b/samoa-gearpump/README.md
new file mode 100644
index 00000000..4bb8363f
--- /dev/null
+++ b/samoa-gearpump/README.md
@@ -0,0 +1,23 @@
+# Executing Apache SAMOA with Apache Gearpump
+
+In this tutorial README we describe how to execute Apache SAMOA on top of [Apache Gearpump(incubating)](http://gearpump.apache.org/).
+
+## Build
+
+Simply clone the repository and install SAMOA.
+
+```
+git clone http://git.apache.org/incubator-samoa.git
+cd incubator-samoa
+mvn -Pgearpump package
+```
+
+The deployable jar for SAMOA will be in `target/SAMOA-gearpump-0.4.0-incubating-SNAPSHOT.jar`.
+
+## Executing SAMOA with Gearpump step-by-step
+
+1. Ensure that you already have Gearpump running. You can follow this [tutorial](http://gearpump.apache.org/releases/latest/deployment-local.html) to deploy Gearpump in local mode.
+2. Set `GEARPUMP_HOME` to point to your Gearpump installation path.
+3. In the SAMOA path, you can input command to execute SAMOA tasks. For example, `bin/samoa gearpump target/SAMOA-gearpump-0.4.0-incubating-SNAPSHOT.jar "PrequentialEvaluation -d /tmp/dump.csv -i 1000000 -f 100000 -l (classifiers.trees.VerticalHoeffdingTree -p 4) -s (generators.RandomTreeGenerator -c 2 -o 10 -u 10)"`
+
+
diff --git a/samoa-gearpump/pom.xml b/samoa-gearpump/pom.xml
new file mode 100644
index 00000000..0fa23ccc
--- /dev/null
+++ b/samoa-gearpump/pom.xml
@@ -0,0 +1,163 @@
+
+
+
+
+ 4.0.0
+
+ UTF-8
+
+
+ samoa-gearpump
+ gearpump bindings for SAMOA
+
+ samoa-gearpump
+
+ org.apache.samoa
+ samoa
+ 0.4.0-incubating-SNAPSHOT
+
+
+
+
+ apache.snapshots
+ Apache Development Snapshot Repository
+ https://repository.apache.org/content/repositories/snapshots/
+
+ false
+
+
+ true
+
+
+
+ gearpump-shaded-repo
+ Vincent at Bintray
+ http://dl.bintray.com/fvunicorn/maven
+
+
+
+
+
+ org.apache.samoa
+ samoa-api
+ ${project.version}
+
+
+ org.apache.samoa
+ samoa-test
+ test-jar
+ test-jar-with-dependencies
+ ${project.version}
+ test
+
+
+ org.apache.gearpump
+ gearpump-streaming_${scala.binary.version}
+ ${gearpump.version}
+
+
+ org.apache.gearpump
+ gearpump-daemon_${scala.binary.version}
+ ${gearpump.version}
+
+
+ org.apache.gearpump
+ gearpump-experimental-cgroup_${scala.binary.version}
+
+
+
+
+ org.slf4j
+ slf4j-log4j12
+ ${slf4j-log4j12.version}
+ test
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 3.2.2
+
+
+ scala-compile-first
+ process-resources
+
+ add-source
+ compile
+
+
+
+ scala-test-compile-first
+ process-test-resources
+
+ testCompile
+
+
+
+
+
+
+ maven-assembly-plugin
+ ${maven-assembly-plugin.version}
+
+ SAMOA-gearpump-${project.version}
+ false
+ false
+ ../target
+
+ jar-with-dependencies
+
+
+
+ ${parsedVersion.osgiVersion}
+ ${project.description}
+ ${project.version}
+ Yahoo Labs
+ SAMOA
+
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${maven-surefire-plugin.version}
+
+ -Xmx1G
+ false
+
+
+
+
+
diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ComponentFactory.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ComponentFactory.java
new file mode 100644
index 00000000..14574610
--- /dev/null
+++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ComponentFactory.java
@@ -0,0 +1,58 @@
+package org.apache.samoa.topology.impl.gearpump;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+
+public class ComponentFactory implements org.apache.samoa.topology.ComponentFactory {
+ @Override
+ public ProcessingItem createPi(Processor processor) {
+ return createPi(processor, 1);
+ }
+
+ @Override
+ public ProcessingItem createPi(Processor processor, int parallelism) {
+ return new GearpumpProcessingItem(processor, parallelism);
+ }
+
+ @Override
+ public EntranceProcessingItem createEntrancePi(EntranceProcessor
+ entranceProcessor) {
+ return new GearpumpEntranceProcessingItem(entranceProcessor);
+ }
+
+ @Override
+ public Stream createStream(IProcessingItem sourcePi) {
+ TopologyNode topologyNode = (TopologyNode) sourcePi;
+ return topologyNode.createStream();
+ }
+
+ @Override
+ public Topology createTopology(String topoName) {
+ return new GearpumpTopology(topoName);
+ }
+}
diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/DoTask.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/DoTask.java
new file mode 100644
index 00000000..90832fc1
--- /dev/null
+++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/DoTask.java
@@ -0,0 +1,42 @@
+package org.apache.samoa.topology.impl.gearpump;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.streaming.StreamApplication;
+import org.apache.gearpump.util.Graph;
+
+public class DoTask {
+
+ public static void main(String[] args) {
+
+ GearpumpTopology topology = Utils.argsToTopology(args);
+ String topologyName = topology.getTopologyName();
+ Graph graph = topology.getGraph();
+ StreamApplication app = StreamApplication.apply(topologyName, graph, UserConfig.empty());
+ ClientContext context = ClientContext.apply();
+ context.submit(app);
+ context.close();
+
+ }
+
+}
diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/EntranceProcessingItemTask.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/EntranceProcessingItemTask.java
new file mode 100644
index 00000000..857bffa2
--- /dev/null
+++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/EntranceProcessingItemTask.java
@@ -0,0 +1,73 @@
+package org.apache.samoa.topology.impl.gearpump;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.task.StartTime;
+import org.apache.gearpump.streaming.task.Task;
+import org.apache.gearpump.streaming.task.TaskContext;
+
+import org.apache.samoa.core.EntranceProcessor;
+
+public class EntranceProcessingItemTask extends Task {
+ EntranceProcessor entranceProcessor;
+ private TaskContext taskContext;
+ private UserConfig userConfig;
+ private GearpumpStream outputStream;
+
+ public EntranceProcessingItemTask(TaskContext taskContext, UserConfig userConf) {
+ super(taskContext, userConf);
+ this.taskContext = taskContext;
+ this.userConfig = userConf;
+ byte[] bytes = userConf.getBytes(Utils.entrancePiConf).get();
+ GearpumpEntranceProcessingItem entranceProcessingItem =
+ ((GearpumpEntranceProcessingItem) Utils.bytesToObject(bytes));
+ this.entranceProcessor = entranceProcessingItem.getProcessor();
+ this.outputStream = entranceProcessingItem.getStream();
+ }
+
+ @Override
+ public void onStart(StartTime startTime) {
+ outputStream.setTaskContext(this.taskContext);
+
+ entranceProcessor.onCreate(taskContext.taskId().index());
+ self().tell(new Message("start", System.currentTimeMillis()), self());
+ }
+
+ @Override
+ public void onNext(Message msg) {
+ if (entranceProcessor.hasNext()) {
+ GearpumpMessage message =
+ new GearpumpMessage(entranceProcessor.nextEvent(), outputStream.getTargetId(),
+ outputStream.getScheme());
+ taskContext.output(new Message(message, System.currentTimeMillis()));
+ }
+ self().tell(new Message("continue", System.currentTimeMillis()), self());
+ }
+
+ @Override
+ public void onStop() {
+ super.onStop();
+ }
+
+}
+
diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpEntranceProcessingItem.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpEntranceProcessingItem.java
new file mode 100644
index 00000000..d2a490d3
--- /dev/null
+++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpEntranceProcessingItem.java
@@ -0,0 +1,78 @@
+package org.apache.samoa.topology.impl.gearpump;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.Processor;
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.AbstractEntranceProcessingItem;
+import org.apache.samoa.topology.Stream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public class GearpumpEntranceProcessingItem extends AbstractEntranceProcessingItem
+ implements TopologyNode, Serializable {
+
+ private GearpumpStream stream;
+
+ public GearpumpEntranceProcessingItem(EntranceProcessor entranceProcessor) {
+ super(entranceProcessor);
+ this.setName(entranceProcessor.getClass().getName());
+ }
+
+ @Override
+ public GearpumpStream createStream() {
+ GearpumpStream stream = new GearpumpStream(this);
+ this.stream = stream;
+ return stream;
+ }
+
+ @Override
+ public Processor createGearpumpProcessor() {
+ byte[] bytes = Utils.objectToBytes(this);
+ UserConfig userConfig = UserConfig.empty().withBytes(Utils.entrancePiConf, bytes);
+ return new org.apache.gearpump.streaming.Processor.DefaultProcessor<>(
+ 1, this.getName(), userConfig, EntranceProcessingItemTask.class);
+ }
+
+ public GearpumpStream getStream() {
+ return stream;
+ }
+
+ private void writeObject(java.io.ObjectOutputStream stream)
+ throws IOException {
+ stream.writeObject(getProcessor());
+ stream.writeObject(getName());
+ stream.writeObject(getOutputStream());
+ stream.writeObject(this.stream);
+ }
+
+ private void readObject(java.io.ObjectInputStream stream)
+ throws IOException, ClassNotFoundException {
+ setProcessor((org.apache.samoa.core.EntranceProcessor) stream.readObject());
+ setName((String) stream.readObject());
+ setOutputStream((Stream) stream.readObject());
+ this.stream = (GearpumpStream) stream.readObject();
+ }
+
+}
diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpMessage.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpMessage.java
new file mode 100644
index 00000000..d38bd6e0
--- /dev/null
+++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpMessage.java
@@ -0,0 +1,66 @@
+package org.apache.samoa.topology.impl.gearpump;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.utils.PartitioningScheme;
+
+import java.io.Serializable;
+
+public class GearpumpMessage implements Serializable {
+ private ContentEvent event;
+ private PartitioningScheme scheme;
+ private String targetId;
+
+ public GearpumpMessage() {
+ this(null, null, null);
+ }
+
+ public GearpumpMessage(ContentEvent event, String targetId, PartitioningScheme scheme) {
+ this.event = event;
+ this.targetId = targetId;
+ this.scheme = scheme;
+ }
+
+ public String getTargetId() {
+ return targetId;
+ }
+
+ public void setTargetId(String targetId) {
+ this.targetId = targetId;
+ }
+
+ public ContentEvent getEvent() {
+ return event;
+ }
+
+ public void setEvent(ContentEvent event) {
+ this.event = event;
+ }
+
+ public PartitioningScheme getScheme() {
+ return scheme;
+ }
+
+ public void setScheme(PartitioningScheme scheme) {
+ this.scheme = scheme;
+ }
+}
diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpProcessingItem.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpProcessingItem.java
new file mode 100644
index 00000000..2453a804
--- /dev/null
+++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpProcessingItem.java
@@ -0,0 +1,90 @@
+package org.apache.samoa.topology.impl.gearpump;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.gearpump.cluster.UserConfig;
+
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.AbstractProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.utils.PartitioningScheme;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+public class GearpumpProcessingItem extends AbstractProcessingItem implements TopologyNode, Serializable {
+ private static final long serialVersionUID = -9066409791668954099L;
+ private Set streams;
+
+ public GearpumpProcessingItem(Processor processor, int parallelism) {
+ super(processor, parallelism);
+ this.setName(processor.getClass().getSimpleName());
+ this.streams = new HashSet<>();
+ }
+
+ @Override
+ protected org.apache.samoa.topology.ProcessingItem addInputStream(Stream inputStream,
+ PartitioningScheme scheme) {
+ ((GearpumpStream) inputStream).setTargetPi(this);
+ ((GearpumpStream) inputStream).setScheme(scheme);
+ ((GearpumpStream) inputStream).setTargetId(this.getName());
+
+ return this;
+ }
+
+ @Override
+ public GearpumpStream createStream() {
+ GearpumpStream stream = new GearpumpStream(this);
+ streams.add(stream);
+ return stream;
+ }
+
+ @Override
+ public org.apache.gearpump.streaming.Processor createGearpumpProcessor() {
+ byte[] bytes = Utils.objectToBytes(this);
+ UserConfig userConfig = UserConfig.empty().withBytes(Utils.piConf, bytes);
+ return new org.apache.gearpump.streaming.Processor.DefaultProcessor<>(
+ this.getParallelism(), this.getName(), userConfig, ProcessingItemTask.class);
+ }
+
+ public Set getStreams() {
+ return streams;
+ }
+
+ private void writeObject(java.io.ObjectOutputStream stream)
+ throws IOException {
+ stream.writeObject(getProcessor());
+ stream.writeObject(getName());
+ stream.writeInt(getParallelism());
+ stream.writeObject(streams);
+ }
+
+ private void readObject(java.io.ObjectInputStream stream)
+ throws IOException, ClassNotFoundException {
+ setProcessor((Processor) stream.readObject());
+ setName((String) stream.readObject());
+ setParallelism(stream.readInt());
+ streams = (Set) stream.readObject();
+ }
+
+}
diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpStream.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpStream.java
new file mode 100644
index 00000000..146ce174
--- /dev/null
+++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpStream.java
@@ -0,0 +1,78 @@
+package org.apache.samoa.topology.impl.gearpump;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.gearpump.Message;
+import org.apache.gearpump.streaming.task.TaskContext;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.AbstractStream;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.utils.PartitioningScheme;
+
+import java.io.Serializable;
+
+public class GearpumpStream extends AbstractStream implements Serializable {
+
+ private TaskContext taskContext;
+ private String targetId;
+ private IProcessingItem targetPi;
+ private PartitioningScheme scheme;
+
+ public GearpumpStream(IProcessingItem sourcePi) {
+ super(sourcePi);
+ }
+
+ public void setTaskContext(TaskContext taskContext) {
+ this.taskContext = taskContext;
+ }
+
+ public String getTargetId() {
+ return targetId;
+ }
+
+ public void setTargetId(String targetId) {
+ this.targetId = targetId;
+ }
+
+ public IProcessingItem getTargetPi() {
+ return targetPi;
+ }
+
+ public void setTargetPi(IProcessingItem targetPi) {
+ this.targetPi = targetPi;
+ }
+
+ public PartitioningScheme getScheme() {
+ return scheme;
+ }
+
+ public void setScheme(PartitioningScheme scheme) {
+ this.scheme = scheme;
+ }
+
+ @Override
+ public void put(ContentEvent event) {
+ GearpumpMessage message = new GearpumpMessage(event, targetId, scheme);
+ taskContext.output(new Message(message, System.currentTimeMillis()));
+ }
+
+}
diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpTopology.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpTopology.java
new file mode 100644
index 00000000..f53ada32
--- /dev/null
+++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpTopology.java
@@ -0,0 +1,71 @@
+package org.apache.samoa.topology.impl.gearpump;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.gearpump.partitioner.Partitioner;
+import org.apache.gearpump.streaming.Processor;
+import org.apache.gearpump.util.Graph;
+
+import org.apache.samoa.topology.AbstractTopology;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.Stream;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class GearpumpTopology extends AbstractTopology {
+
+ private Graph graph;
+ private Map piToProcessor;
+
+ public GearpumpTopology(String name) {
+ super(name);
+ graph = Graph.empty();
+ piToProcessor = new HashMap<>();
+ }
+
+ public Graph getGraph() {
+ buildGraph();
+ return graph;
+ }
+
+ private void buildGraph() {
+ Set processingItems = getProcessingItems();
+ for (IProcessingItem procItem : processingItems) {
+ TopologyNode gearpumpNode = (TopologyNode) procItem;
+ Processor gearpumpProcessor = gearpumpNode.createGearpumpProcessor();
+ piToProcessor.put(procItem, gearpumpProcessor);
+ graph.addVertex(gearpumpProcessor);
+ }
+
+ Set streams = getStreams();
+ Partitioner partitioner = new SamoaMessagePartitioner();
+ for (Stream stream : streams) {
+ GearpumpStream gearpumpStream = (GearpumpStream) stream;
+ IProcessingItem sourcePi = gearpumpStream.getSourceProcessingItem();
+ IProcessingItem targetPi = gearpumpStream.getTargetPi();
+ Processor sourceProcessor = piToProcessor.get(sourcePi);
+ Processor targetProcessor = piToProcessor.get(targetPi);
+ graph.addEdge(sourceProcessor, partitioner, targetProcessor);
+ }
+ }
+}
diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ProcessingItemTask.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ProcessingItemTask.java
new file mode 100644
index 00000000..8f2e0dd2
--- /dev/null
+++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ProcessingItemTask.java
@@ -0,0 +1,77 @@
+package org.apache.samoa.topology.impl.gearpump;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.task.StartTime;
+import org.apache.gearpump.streaming.task.Task;
+import org.apache.gearpump.streaming.task.TaskContext;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+
+import java.util.Set;
+
+public class ProcessingItemTask extends Task {
+
+ private TaskContext taskContext;
+ private UserConfig userConfig;
+ private Processor processor;
+ private Set streams;
+
+ public ProcessingItemTask(TaskContext taskContext, UserConfig userConf) {
+ super(taskContext, userConf);
+ this.taskContext = taskContext;
+ this.userConfig = userConf;
+ byte[] bytes = userConf.getBytes(Utils.piConf).get();
+ GearpumpProcessingItem pi = (GearpumpProcessingItem) Utils.bytesToObject(bytes);
+ this.processor = pi.getProcessor();
+ this.streams = pi.getStreams();
+ }
+
+ public void setProcessor(Processor processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public void onStart(StartTime startTime) {
+ for (GearpumpStream stream : streams) {
+ stream.setTaskContext(this.taskContext);
+ }
+
+ processor.onCreate(taskContext.taskId().index());
+ }
+
+ @Override
+ public void onNext(Message msg) {
+ GearpumpMessage message = (GearpumpMessage) msg.msg();
+ String targetId = message.getTargetId();
+ if (targetId.equals(this.processor.getClass().getSimpleName())) {
+ ContentEvent event = message.getEvent();
+ processor.process(event);
+ }
+ }
+
+ @Override
+ public void onStop() {
+ }
+}
diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SamoaMessagePartitioner.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SamoaMessagePartitioner.java
new file mode 100644
index 00000000..b3ed655c
--- /dev/null
+++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SamoaMessagePartitioner.java
@@ -0,0 +1,60 @@
+package org.apache.samoa.topology.impl.gearpump;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2016 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.gearpump.Message;
+import org.apache.gearpump.partitioner.BroadcastPartitioner;
+import org.apache.gearpump.partitioner.HashPartitioner;
+import org.apache.gearpump.partitioner.MulticastPartitioner;
+import org.apache.gearpump.partitioner.ShufflePartitioner;
+
+public class SamoaMessagePartitioner implements MulticastPartitioner {
+ ShufflePartitioner shufflePartitioner = new ShufflePartitioner();
+ BroadcastPartitioner broadcastPartitioner = new BroadcastPartitioner();
+ HashPartitioner hashPartitioner = new HashPartitioner();
+
+ @Override
+ public int[] getPartitions(Message msg, int partitionNum, int currentPartitionId) {
+ GearpumpMessage message = (GearpumpMessage) msg.msg();
+ int[] partitions = null;
+ switch (message.getScheme()) {
+ case SHUFFLE:
+ partitions = new int[]{
+ shufflePartitioner.getPartition(msg, partitionNum, currentPartitionId)
+ };
+ break;
+ case BROADCAST:
+ partitions = broadcastPartitioner.getPartitions(msg, partitionNum);
+ break;
+ case GROUP_BY_KEY:
+ partitions = new int[]{
+ hashPartitioner.getPartition(msg, partitionNum, currentPartitionId)
+ };
+ break;
+ }
+ return partitions;
+ }
+
+ @Override
+ public int[] getPartitions(Message msg, int partitionNum) {
+ return this.getPartitions(msg, partitionNum, -1);
+ }
+}
diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SerializableSerializer.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SerializableSerializer.java
new file mode 100644
index 00000000..4a14a2ae
--- /dev/null
+++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SerializableSerializer.java
@@ -0,0 +1,70 @@
+package org.apache.samoa.topology.impl.gearpump;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * Serialize and deserialize objects with Java serialization
+ *
+ * @author Anh Thu Vu
+ */
+public class SerializableSerializer extends Serializer