diff --git a/bin/grid b/bin/grid index 5c590e41..84cb5087 100755 --- a/bin/grid +++ b/bin/grid @@ -91,7 +91,7 @@ install_samza() { git clone https://gitbox.apache.org/repos/asf/samza.git cd samza fi - ./gradlew -PscalaSuffix=2.11 clean publishToMavenLocal + ./gradlew -PscalaSuffix=2.12 clean publishToMavenLocal popd } diff --git a/build.gradle b/build.gradle index 07355765..8b1b8b8c 100644 --- a/build.gradle +++ b/build.gradle @@ -25,7 +25,7 @@ defaultTasks 'distTar' task wrapper(type: Wrapper) { description = "Updates gradlew and supporting files." - gradleVersion = '2.3' + gradleVersion = '4.8.1' } version = "$SAMZA_VERSION" diff --git a/gradle.properties b/gradle.properties index 972c966b..b07a0858 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,7 +17,7 @@ * under the License. */ -SAMZA_VERSION=1.7.0-SNAPSHOT +SAMZA_VERSION=1.7.0 KAFKA_VERSION=0.11.0.2 HADOOP_VERSION=2.7.1 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index c91c2cc7..c0c2122c 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ #Fri Mar 27 16:28:33 PDT 2020 -distributionUrl=https\://services.gradle.org/distributions/gradle-2.3-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.8.1-all.zip distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStorePath=wrapper/dists diff --git a/src/main/java/samza/examples/cookbook/CouchbaseTableExample.java b/src/main/java/samza/examples/cookbook/CouchbaseTableExample.java index fdc5bc6c..b2c1b805 100644 --- a/src/main/java/samza/examples/cookbook/CouchbaseTableExample.java +++ b/src/main/java/samza/examples/cookbook/CouchbaseTableExample.java @@ -41,6 +41,8 @@ import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor; import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor; import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; +import org.apache.samza.table.ReadWriteTable; +import org.apache.samza.table.ReadWriteUpdateTable; import org.apache.samza.table.descriptors.RemoteTableDescriptor; import org.apache.samza.table.remote.NoOpTableReadFunction; import org.apache.samza.table.remote.RemoteTable; @@ -190,18 +192,18 @@ public void describe(StreamApplicationDescriptor app) { static class MyCountFunction implements MapFunction { - private MyCouchbaseTableWriteFunction writeFn; + private ReadWriteTable readWriteTable; @Override public void init(Context context) { - RemoteTable table = (RemoteTable) context.getTaskContext().getTable("couchbase-table"); - writeFn = (MyCouchbaseTableWriteFunction) table.getWriteFunction(); + readWriteTable = (ReadWriteTable) context.getTaskContext().getTable("couchbase-table"); } @Override public String apply(String word) { - CompletableFuture countFuture = writeFn.incCounter(word); - CompletableFuture totalCountFuture = writeFn.incCounter(TOTAL_COUNT_ID); + CompletableFuture countFuture = readWriteTable.writeAsync(MyCouchbaseTableWriteFunction.OP_COUNTER, word); + CompletableFuture totalCountFuture = readWriteTable.writeAsync(MyCouchbaseTableWriteFunction.OP_COUNTER, TOTAL_COUNT_ID); + return String.format("%s word=%s, count=%d, total-count=%d", currentTime(), word, countFuture.join(), totalCountFuture.join()); } diff --git a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java index 4f5c5f77..06805442 100644 --- a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java +++ b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java @@ -128,7 +128,7 @@ public void describe(StreamApplicationDescriptor appDescriptor) { MessageStream stockSymbolStream = appDescriptor.getInputStream(stockSymbolInputDescriptor); OutputStream stockPriceStream = appDescriptor.getOutputStream(stockPriceOutputDescriptor); - RemoteTableDescriptor remoteTableDescriptor = + RemoteTableDescriptor remoteTableDescriptor = new RemoteTableDescriptor("remote-table") .withReadRateLimit(10) .withReadFunction(new StockPriceReadFunction()); diff --git a/src/main/java/samza/examples/cookbook/data/AdClick.java b/src/main/java/samza/examples/cookbook/data/AdClick.java index 82925c71..c3de2198 100644 --- a/src/main/java/samza/examples/cookbook/data/AdClick.java +++ b/src/main/java/samza/examples/cookbook/data/AdClick.java @@ -30,6 +30,10 @@ public class AdClick { private String adId; // an unique id for the ad private String userId; // the user that clicked the ad + public AdClick(){ + + } + public AdClick( @JsonProperty("pageId") String pageId, @JsonProperty("adId") String adId, diff --git a/src/main/java/samza/examples/cookbook/data/PageView.java b/src/main/java/samza/examples/cookbook/data/PageView.java index 96406944..0d0ff77a 100644 --- a/src/main/java/samza/examples/cookbook/data/PageView.java +++ b/src/main/java/samza/examples/cookbook/data/PageView.java @@ -24,9 +24,13 @@ * A page view event */ public class PageView { - public final String userId; - public final String country; - public final String pageId; + public String userId; + public String country; + public String pageId; + + public PageView(){ + + } /** * Constructs a page view event.