Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public final void initConfig(GiraphConfiguration conf) {
} else {
TypeOps<?> edgeValueTypeOps =
TypeOpsUtils.getTypeOpsOrNull(edgeValueClass);
if (edgeValueTypeOps != null) {
if (edgeValueTypeOps != null && idTypeOps != null) {
GiraphConstants.VERTEX_EDGES_CLASS.set(
conf, IdAndValueArrayEdges.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ boolean registerPersistentAggregator(
* Broadcast given value to all workers for next computation.
* @param value Value to broadcast
*/
<T extends Writable> BroadcastHandle<T> broadcast(T value);
<T> BroadcastHandle<T> broadcast(T value);

/**
* Call this to log a line to command line of the job. Use in moderation -
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.reducers.ReduceOperation;
import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.hadoop.io.Writable;

/**
Expand Down Expand Up @@ -118,9 +119,10 @@ public <A extends Writable> void setAggregatedValue(String name, A value) {
}

@Override
public <T extends Writable> BroadcastHandle<T> broadcast(T object) {
public <T> BroadcastHandle<T> broadcast(T object) {
BroadcastHandleImpl<T> handle = new BroadcastHandleImpl<>();
master.broadcast(handle.getName(), object);
master.broadcast(
handle.getName(), KryoWritableWrapper.wrapIfNeeded(object));
return handle;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.worker.WorkerAggregatorDelegator;
import org.apache.giraph.worker.WorkerGlobalCommUsage;
import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

Expand Down Expand Up @@ -268,9 +269,9 @@ public void broadcast(String name, Writable value) {
}

@Override
public <T extends Writable> BroadcastHandle<T> broadcast(T object) {
public <T> BroadcastHandle<T> broadcast(T object) {
BroadcastHandleImpl<T> handle = new BroadcastHandleImpl<>();
broadcast(handle.getName(), object);
broadcast(handle.getName(), KryoWritableWrapper.wrapIfNeeded(object));
return handle;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
*/
package org.apache.giraph.block_app.framework.block;

import java.util.Collections;
import java.util.Iterator;

import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.function.Consumer;
import org.apache.giraph.function.Supplier;

import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;

/**
* Block that repeats another block until toQuit supplier returns true,
Expand Down Expand Up @@ -56,19 +55,18 @@ public static Block unlimited(Block block, Supplier<Boolean> toQuit) {

@Override
public Iterator<AbstractPiece> iterator() {
// nCopies uses constant memory, creating a looped list with single element
final Iterator<AbstractPiece> repeatIterator =
Iterables.concat(Collections.nCopies(repeatTimes, block)).iterator();
return new AbstractIterator<AbstractPiece>() {
return Iterators.concat(new AbstractIterator<Iterator<AbstractPiece>>() {
private int index = 0;

@Override
protected AbstractPiece computeNext() {
if (Boolean.TRUE.equals(toQuit.get()) || !repeatIterator.hasNext()) {
protected Iterator<AbstractPiece> computeNext() {
if (index >= repeatTimes || Boolean.TRUE.equals(toQuit.get())) {
return endOfData();
}

return repeatIterator.next();
index++;
return block.iterator();
}
};
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
import org.apache.giraph.block_app.framework.piece.global_comm.internal.CreateReducersApiWrapper;
import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler;
import org.apache.giraph.block_app.framework.piece.global_comm.map.MapHandle;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
import org.apache.giraph.block_app.framework.piece.messages.ObjectMessageClasses;
Expand All @@ -40,10 +41,12 @@
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.types.NoMessage;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

Expand Down Expand Up @@ -323,4 +326,30 @@ protected final void reduceInt(
ReducerHandle<IntWritable, ?> reduceHandle, int value) {
reduceUtils.reduceInt(reduceHandle, value);
}

protected final void reduceShort(
ReducerHandle<ShortWritable, ?> reduceHandle, short value) {
reduceUtils.reduceShort(reduceHandle, value);
}

protected final void reduceByte(
ReducerHandle<ByteWritable, ?> reduceHandle, byte value) {
reduceUtils.reduceByte(reduceHandle, value);
}

public <T> T getMappedLong(MapHandle<LongWritable, T> map, long value) {
return reduceUtils.getMappedLong(map, value);
}

public <T> T getMappedInt(MapHandle<IntWritable, T> map, int value) {
return reduceUtils.getMappedInt(map, value);
}

public <T> T getMappedShort(MapHandle<ShortWritable, T> map, short value) {
return reduceUtils.getMappedShort(map, value);
}

public <T> T getMappedByte(MapHandle<ByteWritable, T> map, byte value) {
return reduceUtils.getMappedByte(map, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.giraph.block_app.framework.piece.global_comm;

import org.apache.giraph.block_app.framework.piece.global_comm.map.MapHandle;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ShortWritable;

/**
* Utility object with common primitive reduce operations,
Expand All @@ -31,6 +34,8 @@ public class ReduceUtilsObject {
private final FloatWritable reusableFloat = new FloatWritable();
private final LongWritable reusableLong = new LongWritable();
private final IntWritable reusableInt = new IntWritable();
private final ShortWritable reusableShort = new ShortWritable();
private final ByteWritable reusableByte = new ByteWritable();

// utility functions:
public void reduceDouble(
Expand All @@ -54,9 +59,52 @@ public void reduceLong(
reduceHandle.reduce(tmp);
}

public void reduceInt(ReducerHandle<IntWritable, ?> reduceHandle, int value) {
public void reduceInt(
ReducerHandle<IntWritable, ?> reduceHandle, int value) {
IntWritable tmp = reusableInt;
tmp.set(value);
reduceHandle.reduce(tmp);
}

public void reduceShort(
ReducerHandle<ShortWritable, ?> reduceHandle, short value) {
ShortWritable tmp = reusableShort;
tmp.set(value);
reduceHandle.reduce(tmp);
}

public void reduceByte(
ReducerHandle<ByteWritable, ?> reduceHandle, byte value) {
ByteWritable tmp = reusableByte;
tmp.set(value);
reduceHandle.reduce(tmp);
}

public <T> T getMappedLong(
MapHandle<LongWritable, T> map, long value) {
LongWritable tmp = reusableLong;
tmp.set(value);
return map.get(tmp);
}

public <T> T getMappedInt(
MapHandle<IntWritable, T> map, int value) {
IntWritable tmp = reusableInt;
tmp.set(value);
return map.get(tmp);
}

public <T> T getMappedShort(
MapHandle<ShortWritable, T> map, short value) {
ShortWritable tmp = reusableShort;
tmp.set(value);
return map.get(tmp);
}

public <T> T getMappedByte(
MapHandle<ByteWritable, T> map, byte value) {
ByteWritable tmp = reusableByte;
tmp.set(value);
return map.get(tmp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.worker.WorkerBroadcastUsage;
import org.apache.giraph.worker.WorkerReduceUsage;
import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.hadoop.io.Writable;

/**
Expand Down Expand Up @@ -97,7 +98,7 @@ public String getName() {

@Override
public T getBroadcast(WorkerBroadcastUsage worker) {
return worker.getBroadcast(name);
return KryoWritableWrapper.unwrapIfNeeded(worker.getBroadcast(name));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ public String toString() {
};
}

/**
* Execute given function on master.
*/
public static
Piece<WritableComparable, Writable, Writable, NoMessage,
Object> masterCompute(
final String pieceName, final Consumer<BlockMasterApi> process) {
return new Piece<WritableComparable, Writable, Writable, NoMessage,
Object>() {
@Override
public void masterCompute(
BlockMasterApi masterApi, Object executionStage) {
process.apply(masterApi);
}
};
}

/**
* For each vertex execute given process function.
* Computation is happening in the receive phase of the returned Piece.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public Vertex<I, V, E> getVertex(Number vertexId) {
return testGraph.getVertex(numberToVertexId(vertexId));
}

public E getEdge(Number vertexId, Number targetId) {
Vertex<I, V, E> vertex = getVertex(vertexId);
return vertex != null ?
vertex.getEdgeValue(numberToVertexId(targetId)) : null;
}

/**
* Get Vertex Value for a given id.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,23 @@ public class SyntheticGraphInit<I extends WritableComparable,
"test.SyntheticGraphCreator.ACTUAL_LOCALITY_RATIO", -1, "");

protected final Supplier<E> edgeSupplier;
protected final boolean allowDuplicates;

public SyntheticGraphInit(Supplier<E> edgeSupplier) {
public SyntheticGraphInit(Supplier<E> edgeSupplier, boolean allowDuplicates) {
this.edgeSupplier = edgeSupplier;
this.allowDuplicates = allowDuplicates;
}

public SyntheticGraphInit(Supplier<E> edgeSupplier) {
this(edgeSupplier, false);
}

public SyntheticGraphInit() {
this.edgeSupplier = null;
this(null);
}

public SyntheticGraphInit(boolean allowDuplicates) {
this(null, allowDuplicates);
}

@Override
Expand All @@ -79,8 +89,10 @@ public void modifyGraph(NumericTestGraph<I, V, E> graph) {
j = random.nextInt(numVertices);
}
} while (j == i);
graph.addSymmetricEdge(
i, j, edgeSupplier != null ? edgeSupplier.get() : null);
if (allowDuplicates || graph.getEdge(i, j) == null) {
graph.addSymmetricEdge(
i, j, edgeSupplier != null ? edgeSupplier.get() : null);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@
import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
import org.apache.giraph.block_app.framework.api.CreateReducersApi;
import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.block.RepeatUntilBlock;
import org.apache.giraph.block_app.framework.block.SequenceBlock;
import org.apache.giraph.block_app.framework.piece.Piece;
import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
import org.apache.giraph.block_app.library.Pieces;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.function.Consumer;
import org.apache.giraph.function.ObjectTransfer;
import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.reducers.impl.SumReduce;
import org.apache.giraph.types.NoMessage;
Expand Down Expand Up @@ -175,4 +182,29 @@ public void vertexSend(Vertex<LongWritable, Writable, Writable> vertex) {
Assert.assertNull(graph.getVertex(new LongWritable(3)));
Assert.assertNotNull(graph.getVertex(new LongWritable(4)));
}

@Test
public void testRepeatUntilBlockFinishCurrentLoop() throws Exception {
final ObjectTransfer<Boolean> toQuit = new ObjectTransfer<>();
final IntRef counter = new IntRef(5);
Block counterPiece = Pieces.masterCompute("Count", new Consumer<BlockMasterApi>() {
@Override
public void apply(BlockMasterApi input) {
counter.value--;
if (counter.value == 0) {
toQuit.apply(true);
}
}
});
Block innerBlock = new SequenceBlock(counterPiece, counterPiece, counterPiece, counterPiece);
Block repeatBlock = RepeatUntilBlock.unlimited(
innerBlock,
toQuit
);

LocalBlockRunner.runBlock(createTestGraph(), repeatBlock, new Object());

Assert.assertEquals(-3, counter.value);
}

}
Loading