Chapter 35. JanusGraph with TinkerPop’s Hadoop-Gremlin

JanusGraph-Hadoop works with TinkerPop’s hadoop-gremlin package for general-purpose OLAP.

Here’s a three step example showing some basic integrated JanusGraph-TinkerPop functionality.

  1. Manually define schema and then load the Grateful Dead graph from a TinkerPop Kryo-serialized binary file
  2. Run a VertexProgram to compute PageRanks, writing the derived graph to output/~g
  3. Read the derived graph vertices and their computed rank values
[Note]Note

The examples in this chapter are based on running Spark in local mode. Additional configuration is required when using Spark in standalone mode or when running Spark on YARN or Mesos.

35.1. Defining defining schema and loading data

bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(3)-oOOo-----
plugin activated: janusgraph.imports
gremlin> :plugin use tinkerpop.hadoop
==>tinkerpop.hadoop activated
gremlin> :plugin use tinkerpop.spark
==>tinkerpop.spark activated
gremlin> :load data/grateful-dead-janusgraph-schema.groovy
==>true
==>true
gremlin> graph = JanusGraphFactory.open('conf/janusgraph-cassandra.properties')
==>standardjanusgraph[cassandrathrift:[127.0.0.1]]
gremlin> defineGratefulDeadSchema(graph)
==>null
gremlin> graph.close()
==>null
gremlin> if (!hdfs.exists('data/grateful-dead.kryo')) hdfs.copyFromLocal('data/grateful-dead.kryo','data/grateful-dead.kryo')
==>null
gremlin> graph = GraphFactory.open('conf/hadoop-graph/hadoop-load.properties')
==>hadoopgraph[gryoinputformat->nulloutputformat]
gremlin> blvp = BulkLoaderVertexProgram.build().writeGraph('conf/janusgraph-cassandra.properties').create(graph)
==>BulkLoaderVertexProgram[bulkLoader=IncrementalBulkLoader,vertexIdProperty=bulkLoader.vertex.id,userSuppliedIds=false,keepOriginalIds=true,batchSize=0]
gremlin> graph.compute(SparkGraphComputer).program(blvp).submit().get()
...
==>result[hadoopgraph[gryoinputformat->nulloutputformat],memory[size:0]]
gremlin> graph.close()
==>null
gremlin> graph = GraphFactory.open('conf/hadoop-graph/read-cassandra.properties')
==>hadoopgraph[cassandrainputformat->gryooutputformat]
gremlin> g = graph.traversal().withComputer(SparkGraphComputer)
==>graphtraversalsource[hadoopgraph[cassandrainputformat->gryooutputformat], sparkgraphcomputer]
gremlin> g.V().count()
...
==>808
# hadoop-load.properties

#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.inputLocation=./data/grateful-dead.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.jarsInDistributedCache=true

#
# GiraphGraphComputer Configuration
#
giraph.minWorkers=2
giraph.maxWorkers=2
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapred.map.child.java.opts=-Xmx1024m
mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000

#
# SparkGraphComputer Configuration
#
spark.master=local[*]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
// grateful-dead-janusgraph-schema.groovy

def defineGratefulDeadSchema(janusGraph) {
    m = janusGraph.openManagement()
    // vertex labels
    artist = m.makeVertexLabel("artist").make()
    song   = m.makeVertexLabel("song").make()
    // edge labels
    sungBy     = m.makeEdgeLabel("sungBy").make()
    writtenBy  = m.makeEdgeLabel("writtenBy").make()
    followedBy = m.makeEdgeLabel("followedBy").make()
    // vertex and edge properties
    blid         = m.makePropertyKey("bulkLoader.vertex.id").dataType(Long.class).make()
    name         = m.makePropertyKey("name").dataType(String.class).make()
    songType     = m.makePropertyKey("songType").dataType(String.class).make()
    performances = m.makePropertyKey("performances").dataType(Integer.class).make()
    weight       = m.makePropertyKey("weight").dataType(Integer.class).make()
    // global indices
    m.buildIndex("byBulkLoaderVertexId", Vertex.class).addKey(blid).buildCompositeIndex()
    m.buildIndex("artistsByName", Vertex.class).addKey(name).indexOnly(artist).buildCompositeIndex()
    m.buildIndex("songsByName", Vertex.class).addKey(name).indexOnly(song).buildCompositeIndex()
    // vertex centric indices
    m.buildEdgeIndex(followedBy, "followedByWeight", Direction.BOTH, Order.decr, weight)
    m.commit()
}

35.2. Running PageRank

A fully functional example of the PageRankVertexProgram can be found in the VertexProgram section of the TinkerPop docs.