Apache Flink: Why do reduce or groupReduce transformations not operate in parallel? -
for example:
dataset<tuple1<long>> input = env.fromelements(1,2,3,4,5,6,7,8,9); dataset<tuple1<long>> sum = input.reduce(new reducefunction()<tuple1<long>,tuple1<long>>{ public tuple1<long> reduce(tuple1<long> value1,tuple1<long> value2){ return new tuple1<>(value1.f0 + value2.f0); } }
if above reduce transform not parallel operation, need use additional 2 transformation 'partitionbyhash' , 'mappartition' below:
dataset<tuple1<long>> input = env.fromelements(1,2,3,4,5,6,7,8,9); dataset<tuple1<long>> sum = input.partitionbyhash(0).mappartition(new mappartitionfunction()<tuple1<long>,tuple1<long>>{ public void map(iterable<tuple1<long>> values,collector<tuple1<long>> out){ long sum = getsum(values); out.collect(new tuple1(sum)); } }).reduce(new reducefunction()<tuple1<long>,tuple1<long>>{ public tuple1<long> reduce(tuple1<long> value1,tuple1<long> value2){ return new tuple1<>(value1.f0 + value2.f0); } }
and why result of reduce transform still instance of dataset not instance of tuple1<long>
two answers, 2 questions:
(1) why reduce() not parallel
fabian gave explanation. operations parallel if applied key. otherwise pre-aggregation parallel.
in second example, make parallel introducing key. instead of complex workaround "mappartition()", can write (java 8 style)
dataset<tuple1<long>> input = ...; input.groupby(0).reduce( (a, b) -> new tuple1<>(a.f0 + b.f0);
note input data small there 1 parallel task anyways. can see parallel pre-aggregation if use larger input, such as:
executionenvironment env = executionenvironment.getexecutionenvironment(); env.setparallelism(10); dataset<long> input = env.generatesequence(1, 100000000); dataset<long> sum = input.reduce ( (a, b) -> + b );
(2) why result of reduce() operation still dataset ?
a dataset still lazy representation of x in cluster. can continue use data in parallel program without triggering computation , fetching result data distributed workers driver program. allows write larger programs run entirely on distributed workers , lazily executed. no data ever fetched client , re-distributed parallel workers.
especially in iterative programs, powerful, entire loops work without ever involving client , needing re-deploy operators.
you can "x" calling "dataset.collext().get(0);" - makes explicit should executed , fetched.
Comments
Post a Comment