Apache Flink: Why do reduce or groupReduce transformations not operate in parallel? -


for example:

dataset<tuple1<long>> input = env.fromelements(1,2,3,4,5,6,7,8,9); dataset<tuple1<long>> sum = input.reduce(new reducefunction()<tuple1<long>,tuple1<long>>{    public tuple1<long> reduce(tuple1<long> value1,tuple1<long> value2){       return new tuple1<>(value1.f0 + value2.f0);    } } 

if above reduce transform not parallel operation, need use additional 2 transformation 'partitionbyhash' , 'mappartition' below:

dataset<tuple1<long>> input = env.fromelements(1,2,3,4,5,6,7,8,9); dataset<tuple1<long>> sum = input.partitionbyhash(0).mappartition(new mappartitionfunction()<tuple1<long>,tuple1<long>>{    public void map(iterable<tuple1<long>> values,collector<tuple1<long>> out){       long sum = getsum(values);       out.collect(new tuple1(sum));    } }).reduce(new reducefunction()<tuple1<long>,tuple1<long>>{    public tuple1<long> reduce(tuple1<long> value1,tuple1<long> value2){       return new tuple1<>(value1.f0 + value2.f0);    } } 

and why result of reduce transform still instance of dataset not instance of tuple1<long>

two answers, 2 questions:

(1) why reduce() not parallel

fabian gave explanation. operations parallel if applied key. otherwise pre-aggregation parallel.

in second example, make parallel introducing key. instead of complex workaround "mappartition()", can write (java 8 style)

dataset<tuple1<long>> input = ...; input.groupby(0).reduce( (a, b) -> new tuple1<>(a.f0 + b.f0); 

note input data small there 1 parallel task anyways. can see parallel pre-aggregation if use larger input, such as:

executionenvironment env =     executionenvironment.getexecutionenvironment(); env.setparallelism(10);  dataset<long> input = env.generatesequence(1, 100000000); dataset<long> sum = input.reduce ( (a, b) -> + b ); 

(2) why result of reduce() operation still dataset ?

a dataset still lazy representation of x in cluster. can continue use data in parallel program without triggering computation , fetching result data distributed workers driver program. allows write larger programs run entirely on distributed workers , lazily executed. no data ever fetched client , re-distributed parallel workers.

especially in iterative programs, powerful, entire loops work without ever involving client , needing re-deploy operators.

you can "x" calling "dataset.collext().get(0);" - makes explicit should executed , fetched.


Comments

Popular posts from this blog

ruby - Trying to change last to "x"s to 23 -

jquery - Clone last and append item to closest class -

c - Unrecognised emulation mode: elf_i386 on MinGW32 -