Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions job/gatherNeoFiles.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/bash

rm -rf ./graph.db
mkdir graph.db/

TO=./graph.db/
FROM=${1}
hadoop fs -get ${FROM}/neostore ${TO}
hadoop fs -get ${FROM}/neostore.id ${TO}
hadoop fs -get ${FROM}/neostore.nodestore.db.id ${TO}
hadoop fs -get ${FROM}/neostore.relationshipstore.db.id ${TO}
hadoop fs -get ${FROM}/neostore.relationshiptypestore.db ${TO}
hadoop fs -get ${FROM}/neostore.relationshiptypestore.db.id ${TO}
hadoop fs -get ${FROM}/neostore.relationshiptypestore.db.names ${TO}
hadoop fs -get ${FROM}/neostore.relationshiptypestore.db.names.id ${TO}

hadoop fs -get ${FROM}/properties/neostore.propertystore.db.* ${TO}

hadoop fs -cat ${FROM}/neostore.nodestore.db/part-r-* > ${TO}/neostore.nodestore.db
hadoop fs -cat ${FROM}/neostore.relationshipstore.db/part-r-* > ${TO}/neostore.relationshipstore.db

hadoop fs -cat ${FROM}/nodeproperties/propertystore.db/props-r-* ${FROM}/edgeproperties/propertystore.db/props-r-* ${FROM}/properties/neostore.propertystore.db.footer > ${TO}/neostore.propertystore.db
hadoop fs -cat ${FROM}/properties/neostore.propertystore.db.strings.header ${FROM}/nodeproperties/propertystore.db/strings-r-* ${FROM}/edgeproperties/propertystore.db/strings-r-* ${FROM}/properties/neostore.propertystore.db.strings.footer > ${TO}/neostore.propertystore.db.strings

rm ${TO}/*.footer
rm ${TO}/*.header
exit
13 changes: 12 additions & 1 deletion job/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,24 @@
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mrunit</artifactId>
<version>0.20.2-cdh3u2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
Expand All @@ -57,7 +68,7 @@
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j</artifactId>
<version>1.6</version>
<version>1.8.M05</version>
</dependency>
<!-- <dependency>
<groupId>ant</groupId>
Expand Down
11 changes: 10 additions & 1 deletion job/src/main/java/nl/waredingen/graphs/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import nl.waredingen.graphs.bgp.PrepareBgpGraphJob;
import nl.waredingen.graphs.importer.Neo4jImportJob;
import nl.waredingen.graphs.misc.RowNumberJob;
import nl.waredingen.graphs.neo.NeoGraphEdgesJob;
import nl.waredingen.graphs.neo.NeoGraphNodesJob;
import nl.waredingen.graphs.neo.mapreduce.PureMRNodesAndEdgesJob;
import nl.waredingen.graphs.partition.IterateJob;
import nl.waredingen.graphs.partition.IterateWithFlagsJob;
import nl.waredingen.graphs.partition.PrepareJob;
Expand Down Expand Up @@ -38,7 +41,13 @@ public int run(String[] args) throws Exception {
} else if (args[0].equalsIgnoreCase("prepare-bgp")) {
return PrepareBgpGraphJob.runJob(args[1], args[2], args[3]);
} else if (args[0].equalsIgnoreCase("rownumbers")) {
return RowNumberJob.run(args[1], args[2], getConf());
return (int) RowNumberJob.run(args[1], args[2], getConf());
} else if (args[0].equalsIgnoreCase("neographnodes")) {
return NeoGraphNodesJob.runJob(args[1], args[2], args[3]);
} else if (args[0].equalsIgnoreCase("neographedges")) {
return NeoGraphEdgesJob.runJob(args[1], args[2], args[3]);
} else if (args[0].equalsIgnoreCase("neograph")) {
return PureMRNodesAndEdgesJob.run(args[1], args[2], args[3], getConf());
} else {
System.err.println("Wrong arguments!");
System.exit(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ public static void main(String[] args) {
}

private Map<String, String> getConfig() {
if (new File("batch.properties").exists()) {
return BatchInserterImpl.loadProperties("batch.properties");
} else {
// if (new File("batch.properties").exists()) {
// return BatchInserterImpl.loadProperties("batch.properties");
// } else {
return stringMap(
"dump_configuration", "true",
"cache_type", "none",
Expand All @@ -187,7 +187,7 @@ private Map<String, String> getConfig() {
"neostore.relationshipstore.db.mapped_memory", "1000M",
"neostore.propertystore.db.mapped_memory", "1000M",
"neostore.propertystore.db.strings.mapped_memory", "100M");
}
// }
}

private boolean progress() {
Expand Down
11 changes: 6 additions & 5 deletions job/src/main/java/nl/waredingen/graphs/misc/RowNumberJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Task.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class RowNumberJob {
public final static byte COUNTER_MARKER = (byte) 'T';
public final static byte VALUE_MARKER = (byte) 'W';

public static int run(String input, String output, Configuration conf) {
public static long run(String input, String output, Configuration conf) {
try {
Job job = new Job(conf, "Row number generator job.");
job.setGroupingComparatorClass(IndifferentComparator.class);
Expand All @@ -35,7 +36,7 @@ public static int run(String input, String output, Configuration conf) {
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(output));

job.setInputFormatClass(TextInputFormat.class);
Expand All @@ -44,13 +45,13 @@ public static int run(String input, String output, Configuration conf) {
job.setJarByClass(RowNumberJob.class);

job.waitForCompletion(true);

return job.getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS).getValue();
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace(System.err);
return 1;
}

return 0;
}

static class RowNumberMapper extends Mapper<LongWritable, Text, ByteWritable, RowNumberWritable> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package nl.waredingen.graphs.neo;

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;

public class ByteBufferOutputFormat<K, V> extends FileOutputFormat<K, V> {

protected static class ByteRecordWriter<K, V> implements RecordWriter<K, V> {
private DataOutputStream out;

public ByteRecordWriter(DataOutputStream out) {
this.out = out;
}

public void write(K key, V value) throws IOException {
boolean nullValue = value == null || value instanceof NullWritable;
if (!nullValue) {
BytesWritable bw = (BytesWritable) value;
out.write(bw.get(), 0, bw.getSize());
}
}

@Override
public void close(Reporter reporter) throws IOException {
out.close();
}

}

@Override
public RecordWriter<K, V> getRecordWriter(
FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
Path path = FileOutputFormat.getTaskOutputPath(job, name);

// create the file in the file system
FileSystem fs = path.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(path, progress);

// create our record writer with the new file
return new ByteRecordWriter<K, V>(new DataOutputStream(fileOut));
}
}
50 changes: 50 additions & 0 deletions job/src/main/java/nl/waredingen/graphs/neo/ByteBufferScheme.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package nl.waredingen.graphs.neo;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;

import cascading.scheme.Scheme;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

@SuppressWarnings("serial")
public class ByteBufferScheme extends Scheme {

@Override
public void sink(TupleEntry tupleEntry, OutputCollector outputCollector)
throws IOException {
Fields sinkFields = getSinkFields();
Tuple result = sinkFields != null ? tupleEntry.selectTuple(sinkFields) : tupleEntry.getTuple();

ByteBuffer bb = (ByteBuffer) result.getObject(0);
byte[] ba = bb.array();
BytesWritable bw = new BytesWritable();
bw.set(ba, 0, ba.length);
outputCollector.collect(NullWritable.get(), bw);
}

@Override
public void sinkInit(Tap tap, JobConf jobconf) throws IOException {
jobconf.setOutputFormat(ByteBufferOutputFormat.class);
}

@Override
public Tuple source(Object obj, Object obj1) {
// TODO Auto-generated method stub
return null;
}

@Override
public void sourceInit(Tap tap, JobConf jobconf) throws IOException {
// TODO Auto-generated method stub

}

}
Loading