diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java index b881e666..2a384573 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java @@ -63,6 +63,16 @@ public static interface ShardingStrategy extends Serializable { public ShardedJoinStrategy(int numShards) { this(new ConstantShardingStrategy(numShards)); } + + /** + * Instantiate with a constant number of shards to use for all keys. + * + * @param numShards number of shards to use + * @param numReducers the amount of reducers to run the join with + */ + public ShardedJoinStrategy(int numShards, int numReducers) { + this(new ConstantShardingStrategy(numShards), numReducers); + } /** * Instantiate with a custom sharding strategy. @@ -74,6 +84,20 @@ public ShardedJoinStrategy(ShardingStrategy shardingStrategy) { this.shardingStrategy = shardingStrategy; } + /** + * Instantiate with a custom sharding strategy and a specified number of reducers. + * + * @param shardingStrategy strategy to be used for sharding + * @param numReducers the amount of reducers to run the join with + */ + public ShardedJoinStrategy(ShardingStrategy shardingStrategy, int numReducers) { + if (numReducers < 1) { + throw new IllegalArgumentException("Num reducers must be > 0, got " + numReducers); + } + this.wrappedJoinStrategy = new DefaultJoinStrategy, U, V>(numReducers); + this.shardingStrategy = shardingStrategy; + } + @Override public PTable> join(PTable left, PTable right, JoinType joinType) {