Class AggregatorManagerImpl
- java.lang.Object
-
- com.alibaba.graphscope.graph.impl.AggregatorManagerImpl
-
- All Implemented Interfaces:
AggregatorManager,org.apache.giraph.aggregators.AggregatorUsage,org.apache.giraph.worker.WorkerAggregatorUsage,org.apache.giraph.worker.WorkerBroadcastUsage,org.apache.giraph.worker.WorkerGlobalCommUsage,org.apache.giraph.worker.WorkerReduceUsage
public class AggregatorManagerImpl extends Object implements AggregatorManager, org.apache.giraph.worker.WorkerAggregatorUsage, org.apache.giraph.worker.WorkerGlobalCommUsage
-
-
Constructor Summary
Constructors Constructor Description AggregatorManagerImpl(ImmutableClassesGiraphConfiguration<?,?,?> conf, int workerId, int workerNum)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidacceptNettyMessage(NettyMessage aggregatorMessage)Accept a message from other worker, aggregate to me.<A extends org.apache.hadoop.io.Writable>
voidaggregate(String name, A value)Add a new value.voidbroadcast(String name, org.apache.hadoop.io.Writable value)Broadcast given value to all workers for next computation.<A extends org.apache.hadoop.io.Writable>
AgetAggregatedValue(String name)Return current aggregated value.<B extends org.apache.hadoop.io.Writable>
BgetBroadcast(String name)Get value broadcasted from masterintgetNumWorkers()<R extends org.apache.hadoop.io.Writable>
RgetReduced(String name)Get reduced value from previous worker computation.intgetWorkerId()voidinit(FFICommunicator communicator)Init the manager with Grape::Communicator, the actual logic depends on implementation.voidpostMasterCompute()voidpostSuperstep()Synchronize aggregator values between workers after superstep.voidpreSuperstep()voidreduce(String name, Object value)Reduce given value.voidreduceMerge(String name, org.apache.hadoop.io.Writable value)Reduce given partial value.<A extends org.apache.hadoop.io.Writable>
booleanregisterAggregator(String name, Class<? extends org.apache.giraph.aggregators.Aggregator<A>> aggregatorClass)Register an aggregator with a unique name<A extends org.apache.hadoop.io.Writable>
booleanregisterPersistentAggregator(String name, Class<? extends org.apache.giraph.aggregators.Aggregator<A>> aggregatorClass)Register a persistent aggregator with a unique name.<S,R extends org.apache.hadoop.io.Writable>
voidregisterReducer(String name, org.apache.giraph.reducers.ReduceOperation<S,R> reduceOp)Register reducer to be reduced in the next worker computation, using given name and operations.<S,R extends org.apache.hadoop.io.Writable>
voidregisterReducer(String name, org.apache.giraph.reducers.ReduceOperation<S,R> reduceOp, R globalInitialValue)Register reducer to be reduced in the next worker computation, using given name and operations, starting globally from globalInitialValue.<A extends org.apache.hadoop.io.Writable>
voidsetAggregatedValue(String name, A value)Set aggregated value.
-
-
-
Constructor Detail
-
AggregatorManagerImpl
public AggregatorManagerImpl(ImmutableClassesGiraphConfiguration<?,?,?> conf, int workerId, int workerNum)
-
-
Method Detail
-
init
public void init(FFICommunicator communicator)
Description copied from interface:AggregatorManagerInit the manager with Grape::Communicator, the actual logic depends on implementation.- Specified by:
initin interfaceAggregatorManager- Parameters:
communicator- communicator.
-
acceptNettyMessage
public void acceptNettyMessage(NettyMessage aggregatorMessage)
Accept a message from other worker, aggregate to me.- Specified by:
acceptNettyMessagein interfaceAggregatorManager- Parameters:
aggregatorMessage- received message.
-
getWorkerId
public int getWorkerId()
- Specified by:
getWorkerIdin interfaceAggregatorManager
-
getNumWorkers
public int getNumWorkers()
- Specified by:
getNumWorkersin interfaceAggregatorManager
-
registerAggregator
public <A extends org.apache.hadoop.io.Writable> boolean registerAggregator(String name, Class<? extends org.apache.giraph.aggregators.Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException
Register an aggregator with a unique name- Specified by:
registerAggregatorin interfaceAggregatorManager- Type Parameters:
A- type param- Parameters:
name- aggregator nameaggregatorClass- the class- Throws:
InstantiationExceptionIllegalAccessException
-
registerPersistentAggregator
public <A extends org.apache.hadoop.io.Writable> boolean registerPersistentAggregator(String name, Class<? extends org.apache.giraph.aggregators.Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException
Register a persistent aggregator with a unique name.- Specified by:
registerPersistentAggregatorin interfaceAggregatorManager- Type Parameters:
A- type param- Parameters:
name- aggregator nameaggregatorClass- the implementation class- Throws:
InstantiationExceptionIllegalAccessException
-
getAggregatedValue
public <A extends org.apache.hadoop.io.Writable> A getAggregatedValue(String name)
Return current aggregated value. Needs to be initialized if aggregate or setAggregatedValue have not been called before.- Specified by:
getAggregatedValuein interfaceAggregatorManager- Specified by:
getAggregatedValuein interfaceorg.apache.giraph.aggregators.AggregatorUsage- Parameters:
name- name for the aggregator- Returns:
- Aggregated
-
getBroadcast
public <B extends org.apache.hadoop.io.Writable> B getBroadcast(String name)
Get value broadcasted from master- Specified by:
getBroadcastin interfaceorg.apache.giraph.worker.WorkerBroadcastUsage- Parameters:
name- Name of the broadcasted value- Returns:
- Broadcasted value
-
reduce
public void reduce(String name, Object value)
Reduce given value.- Specified by:
reducein interfaceAggregatorManager- Specified by:
reducein interfaceorg.apache.giraph.worker.WorkerReduceUsage- Parameters:
name- Name of the reducervalue- Single value to reduce
-
reduceMerge
public void reduceMerge(String name, org.apache.hadoop.io.Writable value)
Reduce given partial value.- Specified by:
reduceMergein interfaceorg.apache.giraph.worker.WorkerReduceUsage- Parameters:
name- Name of the reducervalue- Single value to reduce
-
setAggregatedValue
public <A extends org.apache.hadoop.io.Writable> void setAggregatedValue(String name, A value)
Set aggregated value. Can be used for initialization or reset.- Specified by:
setAggregatedValuein interfaceAggregatorManager- Parameters:
name- name for the aggregatorvalue- Value to be set.
-
aggregate
public <A extends org.apache.hadoop.io.Writable> void aggregate(String name, A value)
Add a new value. Needs to be commutative and associative- Specified by:
aggregatein interfaceAggregatorManager- Specified by:
aggregatein interfaceorg.apache.giraph.worker.WorkerAggregatorUsage- Parameters:
name- a unique name refer to an aggregatorvalue- Value to be aggregated.
-
registerReducer
public <S,R extends org.apache.hadoop.io.Writable> void registerReducer(String name, org.apache.giraph.reducers.ReduceOperation<S,R> reduceOp)
Register reducer to be reduced in the next worker computation, using given name and operations.- Specified by:
registerReducerin interfaceAggregatorManager- Type Parameters:
S- Single value typeR- Reduced value type- Parameters:
name- Name of the reducerreduceOp- Reduce operations
-
registerReducer
public <S,R extends org.apache.hadoop.io.Writable> void registerReducer(String name, org.apache.giraph.reducers.ReduceOperation<S,R> reduceOp, R globalInitialValue)
Register reducer to be reduced in the next worker computation, using given name and operations, starting globally from globalInitialValue. (globalInitialValue is reduced only once, each worker will still start from neutral initial value)- Specified by:
registerReducerin interfaceAggregatorManager- Type Parameters:
S- Single value typeR- Reduced value type- Parameters:
name- Name of the reducerreduceOp- Reduce operationsglobalInitialValue- Global initial value
-
getReduced
public <R extends org.apache.hadoop.io.Writable> R getReduced(String name)
Get reduced value from previous worker computation.- Specified by:
getReducedin interfaceAggregatorManager- Type Parameters:
R- Reduced value type- Parameters:
name- Name of the reducer- Returns:
- Reduced value
-
broadcast
public void broadcast(String name, org.apache.hadoop.io.Writable value)
Broadcast given value to all workers for next computation.- Specified by:
broadcastin interfaceAggregatorManager- Parameters:
name- Name of the broadcast objectvalue- Value
-
preSuperstep
public void preSuperstep()
- Specified by:
preSuperstepin interfaceAggregatorManager
-
postSuperstep
public void postSuperstep()
Synchronize aggregator values between workers after superstep.- Specified by:
postSuperstepin interfaceAggregatorManager
-
postMasterCompute
public void postMasterCompute()
-
-