From bdc980095c1c56f35cfa4cdad58e8caf03df7515 Mon Sep 17 00:00:00 2001 From: Jason Gauci Date: Mon, 17 Feb 2014 16:33:38 -0800 Subject: [PATCH 1/4] Enforce crunch.max.reducers in all cases --- .../java/org/apache/crunch/util/PartitionUtils.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java index 25f88660..cdcc4018 100644 --- a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java +++ b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java @@ -37,7 +37,12 @@ public class PartitionUtils { public static int getRecommendedPartitions(PCollection pcollection) { Configuration conf = pcollection.getPipeline().getConfiguration(); - int recommended = getRecommendedPartitions(pcollection, conf); + return getRecommendedPartitions(pcollection, conf); + } + + public static int getRecommendedPartitions(PCollection pcollection, Configuration conf) { + long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK); + int recommended = 1 + (int) (pcollection.getSize() / bytesPerTask); int maxRecommended = conf.getInt(MAX_REDUCERS, DEFAULT_MAX_REDUCERS); if (maxRecommended > 0 && recommended > maxRecommended) { return maxRecommended; @@ -45,9 +50,4 @@ public static int getRecommendedPartitions(PCollection pcollection) { return recommended; } } - - public static int getRecommendedPartitions(PCollection pcollection, Configuration conf) { - long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK); - return 1 + (int) (pcollection.getSize() / bytesPerTask); - } } From 4bd8b7a13b893ddb89c11d657ba5347b92fe5a88 Mon Sep 17 00:00:00 2001 From: Jason Gauci Date: Thu, 10 Apr 2014 23:13:15 -0700 Subject: [PATCH 2/4] Added aggregate(...) to PCollection --- .../java/org/apache/crunch/PCollection.java | 5 ++ .../impl/dist/collect/PCollectionImpl.java | 9 +++ .../impl/mem/collect/MemCollection.java | 6 ++ .../java/org/apache/crunch/lib/Aggregate.java | 13 ++++ .../crunch/examples/TotalWordCount.java | 78 +++++++++++++++++++ 5 files changed, 111 insertions(+) create mode 100644 crunch-examples/src/main/java/org/apache/crunch/examples/TotalWordCount.java diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java index 2d62d003..1d3598c5 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java @@ -267,4 +267,9 @@ PTable parallelDo(String name, DoFn> doFn, PTableType * Returns a {@code PObject} of the minimum element of this instance. */ PObject min(); + + /** + * Returns a {@code PObject} of an aggregate of this instance. + */ + PObject aggregate(Aggregator aggregator); } diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java index ee820f0c..9dc1c570 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java @@ -19,6 +19,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; + +import org.apache.crunch.Aggregator; import org.apache.crunch.CachingOptions; import org.apache.crunch.DoFn; import org.apache.crunch.FilterFn; @@ -31,12 +33,14 @@ import org.apache.crunch.ReadableData; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; +import org.apache.crunch.fn.Aggregators; import org.apache.crunch.fn.ExtractKeyFn; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.materialize.pobject.CollectionPObject; +import org.apache.crunch.materialize.pobject.FirstElementPObject; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; @@ -259,6 +263,11 @@ public PObject max() { public PObject min() { return Aggregate.min(this); } + + @Override + public PObject aggregate(Aggregator aggregator) { + return Aggregate.aggregate(this, aggregator); + } @Override public PTypeFamily getTypeFamily() { diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index 81433eb9..c586fa52 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -25,6 +25,7 @@ import javassist.util.proxy.MethodHandler; import javassist.util.proxy.ProxyFactory; +import org.apache.crunch.Aggregator; import org.apache.crunch.CachingOptions; import org.apache.crunch.DoFn; import org.apache.crunch.FilterFn; @@ -240,6 +241,11 @@ public PObject min() { return Aggregate.min(this); } + @Override + public PObject aggregate(Aggregator aggregator) { + return Aggregate.aggregate(this, aggregator); + } + @Override public PCollection filter(FilterFn filterFn) { return parallelDo(filterFn, getPType()); diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java index d8388b33..4ec707aa 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.PriorityQueue; +import org.apache.crunch.Aggregator; import org.apache.crunch.CombineFn; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; @@ -277,4 +278,16 @@ public Collection map(Iterable values) { } }, tf.collections(collect.getValueType())); } + + public static PObject aggregate(PCollection collect, Aggregator aggregator) { + PTypeFamily tf = collect.getTypeFamily(); + PCollection aggregation = collect.parallelDo("Aggregate.aggregator", new MapFn>() { + public Pair map(S input) { + return Pair.of(0L, input); + } + }, tf.tableOf(tf.longs(), collect.getPType())) + .groupByKey(1) + .combineValues(aggregator).values(); + return new FirstElementPObject(aggregation); + } } diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/TotalWordCount.java b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalWordCount.java new file mode 100644 index 00000000..a4309eed --- /dev/null +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalWordCount.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.examples; + +import java.io.Serializable; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PObject; +import org.apache.crunch.PTable; +import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class TotalWordCount extends Configured implements Tool, Serializable { + public int run(String[] args) throws Exception { + if (args.length != 1) { + System.err.println(); + System.err.println("Usage: " + this.getClass().getName() + " [generic options] input"); + System.err.println(); + GenericOptionsParser.printGenericCommandUsage(System.err); + return 1; + } + // Create an object to coordinate pipeline creation and execution. + Pipeline pipeline = new MRPipeline(TotalWordCount.class, getConf()); + // Reference a given text file as a collection of Strings. + PCollection lines = pipeline.readTextFile(args[0]); + + // Define a function that splits each line in a PCollection of Strings into + // a + // PCollection made up of the individual words in the file. + PCollection numberOfWords = lines.parallelDo(new DoFn() { + public void process(String line, Emitter emitter) { + emitter.emit((long)line.split("\\s+").length); + } + }, Writables.longs()); // Indicates the serialization format + + // The aggregate method groups a collection into a single PObject. + PObject totalCount = numberOfWords.aggregate(Aggregators.SUM_LONGS()); + + // Execute the pipeline as a MapReduce. + PipelineResult result = pipeline.run(); + + System.out.println("Total number of words: " + totalCount.getValue()); + + pipeline.done(); + + return result.succeeded() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + int result = ToolRunner.run(new Configuration(), new TotalWordCount(), args); + System.exit(result); + } +} From 6b965a67319bf262aca6d5db5a9d2803a5257a71 Mon Sep 17 00:00:00 2001 From: Jason Gauci Date: Fri, 11 Apr 2014 06:13:40 -0700 Subject: [PATCH 3/4] Remove numPartitions from Aggregate.aggregate --- crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java index 4ec707aa..a07a2846 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java @@ -286,7 +286,7 @@ public Pair map(S input) { return Pair.of(0L, input); } }, tf.tableOf(tf.longs(), collect.getPType())) - .groupByKey(1) + .groupByKey() .combineValues(aggregator).values(); return new FirstElementPObject(aggregation); } From 9e3fdc244d9b89333228cb19c41390e09f90922b Mon Sep 17 00:00:00 2001 From: Jason Gauci Date: Mon, 14 Apr 2014 11:21:23 -0700 Subject: [PATCH 4/4] Remove unused imports. --- .../org/apache/crunch/impl/dist/collect/PCollectionImpl.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java index 9dc1c570..6e1a713d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java @@ -33,14 +33,12 @@ import org.apache.crunch.ReadableData; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; -import org.apache.crunch.fn.Aggregators; import org.apache.crunch.fn.ExtractKeyFn; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.materialize.pobject.CollectionPObject; -import org.apache.crunch.materialize.pobject.FirstElementPObject; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily;