From 516be548e285a4cd39963d41832e5c07963e67a2 Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Sat, 8 Apr 2017 20:04:15 -0700 Subject: [PATCH 01/14] Update Primary Keys to ensure uniqueness --- app/db/CassandraClient.java | 11 ++++++++--- scripts/cassandra/cassandra.cql | 20 ++++++++------------ 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/app/db/CassandraClient.java b/app/db/CassandraClient.java index caab8fd4..ebd86735 100644 --- a/app/db/CassandraClient.java +++ b/app/db/CassandraClient.java @@ -27,11 +27,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class CassandraClient extends DbClient { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClient.class); @@ -81,6 +79,12 @@ public void insert(String table, List insertValues) { String insert = "insert into " + table + "(" + fields + ") values (" + values + ");"; + // Ensure the following tables have unique entries + if (table.equals("edge") || table.equals("node") || table.equals("graph") || table.equals("structure")) { + insert = insert.substring(0, insert.length() - 1) + "IF NOT EXISTS;"; + } + BoundStatement statement = this.prepareStatement(insert); + BoundStatement statement = bind(insert, insertValues); LOGGER.info("Executing update: " + statement.preparedStatement().getQueryString() + "."); @@ -109,6 +113,7 @@ public CassandraResults equalitySelect(String table, select += " where " + predicatesString; } + // This might not be very efficient https://www.datastax.com/dev/blog/allow-filtering-explained-2 select += " ALLOW FILTERING;"; BoundStatement statement = bind(select, predicatesAndValues); diff --git a/scripts/cassandra/cassandra.cql b/scripts/cassandra/cassandra.cql index a5448811..c677676e 100644 --- a/scripts/cassandra/cassandra.cql +++ b/scripts/cassandra/cassandra.cql @@ -45,10 +45,9 @@ CREATE TABLE IF NOT EXISTS version_history_dag ( -- MODELS CREATE TABLE IF NOT EXISTS structure ( - item_id bigint, + item_id bigint PRIMARY KEY, source_key varchar, - name varchar, - PRIMARY KEY (item_id, source_key) + name varchar ); CREATE TABLE IF NOT EXISTS structure_version ( @@ -85,26 +84,23 @@ CREATE TABLE IF NOT EXISTS rich_version_tag ( ); CREATE TABLE IF NOT EXISTS edge ( - item_id bigint, + item_id bigint PRIMARY KEY, source_key varchar, from_node_id bigint, to_node_id bigint, - name varchar, - PRIMARY KEY (item_id, source_key) + name varchar ); CREATE TABLE IF NOT EXISTS node ( - item_id bigint, + item_id bigint PRIMARY KEY, source_key varchar, - name varchar, - PRIMARY KEY (item_id, source_key) + name varchar ); CREATE TABLE IF NOT EXISTS graph ( - item_id bigint, + item_id bigint PRIMARY KEY, source_key varchar, - name varchar, - PRIMARY KEY (item_id, source_key) + name varchar ); CREATE TABLE IF NOT EXISTS node_version ( From eaf5b8ceb555364e7fbf780fbe12e7a1c39068ed Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Sat, 8 Apr 2017 20:18:51 -0700 Subject: [PATCH 02/14] Add methods to query and update sets from tables --- app/db/CassandraClient.java | 45 +++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/app/db/CassandraClient.java b/app/db/CassandraClient.java index ebd86735..71730b73 100644 --- a/app/db/CassandraClient.java +++ b/app/db/CassandraClient.java @@ -91,6 +91,32 @@ public void insert(String table, List insertValues) { this.session.execute(statement); } + public void addToSet(String table, String collectionName, + Set value, List predicates) { + String predicatesString = + predicates + .stream() + .map(predicate -> predicate.getField() + " = ?") + .collect(Collectors.joining(" and ")); + String where = " where " + predicatesString + ";"; + String query = "UPDATE " + table + " SET " + collectionName + " = " + collectionName + " + " + " ? " + where; + + BoundStatement statement = this.prepareStatement(query); + + // Cannot use normal setValue method for collections: https://datastax-oss.atlassian.net/browse/JAVA-185 + statement.bind(value); + + int index = 1; + for (DbDataContainer container : predicates) { + CassandraClient.setValue(statement, container.getValue(), container.getGroundType(), index); + + index++; + } + + LOGGER.info("Executing update: " + statement.preparedStatement().getQueryString() + "."); + this.session.execute(statement); + } + /** * Retrieve rows based on a set of predicates. * @@ -124,6 +150,25 @@ public CassandraResults equalitySelect(String table, return new CassandraResults(resultSet); } + public CassandraResults selectWhereCollectionContains( + String table, List projection, String collectionName, DbDataContainer value) + throws EmptyResultException { + String query = "SELECT " + String.join(", ", projection) + " FROM " + table + " WHERE " + collectionName + + " CONTAINS ?;"; + + BoundStatement statement = this.prepareStatement(query); + CassandraClient.setValue(statement, value.getValue(), value.getGroundType(), 0); + + LOGGER.info("Executing query: " + statement.preparedStatement().getQueryString() + "."); + ResultSet resultSet = this.session.execute(statement); + + if (resultSet == null || resultSet.isExhausted()) { + throw new EmptyResultException("No results found for query: " + statement.toString()); + } + + return new CassandraResults(resultSet); + } + /** * Execute an update statement in Cassandra. * From 86c63aefb96dac3273b6b9660e10d17abc7c821c Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Sat, 8 Apr 2017 20:36:51 -0700 Subject: [PATCH 03/14] Encapsulate Cassandra version_successor_dag into version_successor --- .../CassandraVersionHistoryDagFactory.java | 32 ++++++------- .../CassandraVersionSuccessorFactory.java | 20 ++++++++ scripts/cassandra/cassandra.cql | 11 ++--- scripts/cassandra/drop_cassandra.cql | 47 +++++++++---------- 4 files changed, 61 insertions(+), 49 deletions(-) diff --git a/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java b/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java index f10cbcb8..4d1fc4b4 100644 --- a/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java +++ b/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java @@ -61,22 +61,15 @@ public VersionHistoryDag create(long itemId) throws Groun @Override public VersionHistoryDag retrieveFromDatabase(long itemId) throws GroundException { - List predicates = new ArrayList<>(); - predicates.add(new DbDataContainer("item_id", GroundType.LONG, itemId)); - CassandraResults resultSet = this.dbClient.equalitySelect("version_history_dag", DbClient.SELECT_STAR, - predicates); + List> edges; - if (resultSet.isEmpty()) { - return new VersionHistoryDag(itemId, new ArrayList<>()); + try { + edges = this.versionSuccessorFactory.retrieveFromDatabaseByItemId(itemId); + } catch (EmptyResultException e) { + // do nothing, this just means no version have been added yet. + return VersionHistoryDagFactory.construct(itemId, new ArrayList>()); } - - List> edges = new ArrayList<>(); - do { - edges.add(this.versionSuccessorFactory.retrieveFromDatabase(resultSet - .getLong("version_successor_id"))); - } while (resultSet.next()); - - return new VersionHistoryDag(itemId, edges); + return VersionHistoryDagFactory.construct(itemId, edges); } /** @@ -93,12 +86,15 @@ public void addEdge(VersionHistoryDag dag, long parentId, long childId, long ite throws GroundException { VersionSuccessor successor = this.versionSuccessorFactory.create(parentId, childId); - List insertions = new ArrayList<>(); - insertions.add(new DbDataContainer("item_id", GroundType.LONG, itemId)); - insertions.add(new DbDataContainer("version_successor_id", GroundType.LONG, successor.getId())); + // Adding to the entry with id = successor.getId() + List predicate = new ArrayList<>(); + predicate.add(new DbDataContainer("id", GroundType.LONG, successor.getId())); - this.dbClient.insert("version_history_dag", insertions); + // To add to a set column, must pass in a set containing the value(s) to add. See CasandraClient.addToSet + Set setValues = new HashSet<>(); + setValues.add(itemId); + this.dbClient.addToSet("version_successor", "item_id_set", setValues, predicate); dag.addEdge(parentId, childId, successor.getId()); } diff --git a/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java b/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java index 08d71001..703bac33 100644 --- a/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java +++ b/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java @@ -66,6 +66,26 @@ public VersionSuccessor create(long fromId, long toId) return new VersionSuccessor<>(dbId, toId, fromId); } + public List> retrieveFromDatabaseByItemId(long itemId) throws GroundException, EmptyResultException { + CassandraResults resultSet; + + List> versionSuccessors = new ArrayList<>(); + resultSet = this.dbClient.selectWhereCollectionContains( + "version_successor", + DbClient.SELECT_STAR, + "item_id_set", + new DbDataContainer(null, GroundType.LONG, itemId)); + + do { + long id = resultSet.getLong("id"); + long fromId = resultSet.getLong("from_version_id"); + long toId = resultSet.getLong("to_version_id"); + versionSuccessors.add(VersionSuccessorFactory.construct(id, fromId, toId)); + } while (resultSet.next()); + + return versionSuccessors; + } + /** * Retrieve a version successor from the database. * diff --git a/scripts/cassandra/cassandra.cql b/scripts/cassandra/cassandra.cql index c677676e..3b0b80ed 100644 --- a/scripts/cassandra/cassandra.cql +++ b/scripts/cassandra/cassandra.cql @@ -21,9 +21,12 @@ CREATE TABLE IF NOT EXISTS version ( CREATE TABLE IF NOT EXISTS version_successor ( id bigint PRIMARY KEY, from_version_id bigint, - to_version_id bigint + to_version_id bigint, + item_id_set set ); +create index version_successor_ind on version_successor (item_id_set); + CREATE TABLE IF NOT EXISTS item ( id bigint PRIMARY KEY ); @@ -36,12 +39,6 @@ CREATE TABLE IF NOT EXISTS item_tag ( PRIMARY KEY (item_id, key) ); -CREATE TABLE IF NOT EXISTS version_history_dag ( - item_id bigint, - version_successor_id bigint, - PRIMARY KEY(item_id, version_successor_id) -); - -- MODELS CREATE TABLE IF NOT EXISTS structure ( diff --git a/scripts/cassandra/drop_cassandra.cql b/scripts/cassandra/drop_cassandra.cql index 4d273e0d..0a3ab660 100644 --- a/scripts/cassandra/drop_cassandra.cql +++ b/scripts/cassandra/drop_cassandra.cql @@ -12,27 +12,26 @@ * limitations under the License. */ -DROP TABLE lineage_graph_version_edge; -DROP TABLE lineage_graph_version; -DROP TABLE lineage_graph; -DROP TABLE lineage_edge_version; -DROP TABLE lineage_edge; -DROP TABLE principal; -DROP TABLE graph_version_edge; -DROP TABLE graph_version; -DROP TABLE edge_version; -DROP TABLE node_version; -DROP TABLE graph; -DROP TABLE node; -DROP TABLE edge; -DROP TABLE rich_version_tag; -DROP TABLE rich_version_external_parameter; -DROP TABLE rich_version; -DROP TABLE structure_version_attribute; -DROP TABLE structure_version; -DROP TABLE structure; -DROP TABLE version_history_dag; -DROP TABLE item_tag; -DROP TABLE item; -DROP TABLE version_successor; -DROP TABLE version; +drop table lineage_graph_version_edge; +drop table lineage_graph_version; +drop table lineage_graph; +drop table lineage_edge_version; +drop table lineage_edge; +drop table principal; +drop table graph_version_edge; +drop table graph_version; +drop table edge_version; +drop table node_version; +drop table graph; +drop table node; +drop table edge; +drop table rich_version_tag; +drop table rich_version_external_parameter; +drop table rich_version; +drop table structure_version_attribute; +drop table structure_version; +drop table structure; +drop table item_tag; +drop table item; +drop table version_successor; +drop table version; From 6cff1d06c8fa836edc0bab28a1ec32419b00ad0c Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Sun, 9 Apr 2017 19:30:35 -0700 Subject: [PATCH 04/14] Document set methods in CassandraClient --- app/db/CassandraClient.java | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/app/db/CassandraClient.java b/app/db/CassandraClient.java index 71730b73..291cd86b 100644 --- a/app/db/CassandraClient.java +++ b/app/db/CassandraClient.java @@ -91,20 +91,27 @@ public void insert(String table, List insertValues) { this.session.execute(statement); } - public void addToSet(String table, String collectionName, - Set value, List predicates) { + /** + * + * @param table the table to update + * @param setName the name of the set (column) being appended to + * @param values a set containing all values to be added. Set type must match with column type + * @param predicates values specifying which row will be updated + */ + public void addToSet(String table, String setName, + Set values, List predicates) { String predicatesString = predicates .stream() .map(predicate -> predicate.getField() + " = ?") .collect(Collectors.joining(" and ")); - String where = " where " + predicatesString + ";"; - String query = "UPDATE " + table + " SET " + collectionName + " = " + collectionName + " + " + " ? " + where; + String whereString = " where " + predicatesString + ";"; + String query = "UPDATE " + table + " SET " + setName + " = " + setName + " + " + " ? " + whereString; BoundStatement statement = this.prepareStatement(query); // Cannot use normal setValue method for collections: https://datastax-oss.atlassian.net/browse/JAVA-185 - statement.bind(value); + statement.bind(values); int index = 1; for (DbDataContainer container : predicates) { @@ -150,10 +157,18 @@ public CassandraResults equalitySelect(String table, return new CassandraResults(resultSet); } + /** + * + * @param table the table to query + * @param projection the set of columns to retrieve + * @param setName the name of the set we will search + * @param value the value searched for in the set + * @throws EmptyResultException + */ public CassandraResults selectWhereCollectionContains( - String table, List projection, String collectionName, DbDataContainer value) + String table, List projection, String setName, DbDataContainer value) throws EmptyResultException { - String query = "SELECT " + String.join(", ", projection) + " FROM " + table + " WHERE " + collectionName + String query = "SELECT " + String.join(", ", projection) + " FROM " + table + " WHERE " + setName + " CONTAINS ?;"; BoundStatement statement = this.prepareStatement(query); From 543f47c5c673c3a3f38e0e3fa0db9d6227a5dbe9 Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Thu, 13 Apr 2017 18:52:46 -0700 Subject: [PATCH 05/14] Make all tests pass --- .../cassandra/CassandraVersionHistoryDagFactory.java | 6 +++--- .../cassandra/CassandraVersionSuccessorFactory.java | 9 +++++++-- app/db/CassandraClient.java | 12 +++--------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java b/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java index 4d1fc4b4..53144329 100644 --- a/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java +++ b/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java @@ -65,11 +65,11 @@ public VersionHistoryDag retrieveFromDatabase(long itemId try { edges = this.versionSuccessorFactory.retrieveFromDatabaseByItemId(itemId); - } catch (EmptyResultException e) { + } catch (GroundException e) { // do nothing, this just means no version have been added yet. - return VersionHistoryDagFactory.construct(itemId, new ArrayList>()); + return new VersionHistoryDag(itemId, new ArrayList<>()); } - return VersionHistoryDagFactory.construct(itemId, edges); + return new VersionHistoryDag(itemId, edges); } /** diff --git a/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java b/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java index 703bac33..f0c04d5f 100644 --- a/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java +++ b/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java @@ -66,7 +66,8 @@ public VersionSuccessor create(long fromId, long toId) return new VersionSuccessor<>(dbId, toId, fromId); } - public List> retrieveFromDatabaseByItemId(long itemId) throws GroundException, EmptyResultException { + public List> retrieveFromDatabaseByItemId(long itemId) + throws GroundException { CassandraResults resultSet; List> versionSuccessors = new ArrayList<>(); @@ -76,11 +77,15 @@ public List> retrieveFromDatabaseByItemI "item_id_set", new DbDataContainer(null, GroundType.LONG, itemId)); + if (resultSet.isEmpty()) { + throw new GroundException("No VersionSuccessor found with itemId " + itemId + "."); + } + do { long id = resultSet.getLong("id"); long fromId = resultSet.getLong("from_version_id"); long toId = resultSet.getLong("to_version_id"); - versionSuccessors.add(VersionSuccessorFactory.construct(id, fromId, toId)); + versionSuccessors.add(new VersionSuccessor<>(id, fromId, toId)); } while (resultSet.next()); return versionSuccessors; diff --git a/app/db/CassandraClient.java b/app/db/CassandraClient.java index 291cd86b..94bfcde4 100644 --- a/app/db/CassandraClient.java +++ b/app/db/CassandraClient.java @@ -161,14 +161,12 @@ public CassandraResults equalitySelect(String table, * * @param table the table to query * @param projection the set of columns to retrieve - * @param setName the name of the set we will search + * @param collectionName the name of the set we will search * @param value the value searched for in the set - * @throws EmptyResultException */ public CassandraResults selectWhereCollectionContains( - String table, List projection, String setName, DbDataContainer value) - throws EmptyResultException { - String query = "SELECT " + String.join(", ", projection) + " FROM " + table + " WHERE " + setName + String table, List projection, String collectionName, DbDataContainer value) { + String query = "SELECT " + String.join(", ", projection) + " FROM " + table + " WHERE " + collectionName + " CONTAINS ?;"; BoundStatement statement = this.prepareStatement(query); @@ -177,10 +175,6 @@ public CassandraResults selectWhereCollectionContains( LOGGER.info("Executing query: " + statement.preparedStatement().getQueryString() + "."); ResultSet resultSet = this.session.execute(statement); - if (resultSet == null || resultSet.isExhausted()) { - throw new EmptyResultException("No results found for query: " + statement.toString()); - } - return new CassandraResults(resultSet); } From 751393fdfdf240bc931a6a3e570ce7b6e16cd60b Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Fri, 14 Apr 2017 12:53:16 -0700 Subject: [PATCH 06/14] Add methods to update/delete/query Cassandra Maps/Sets --- app/db/CassandraClient.java | 81 ++++++++++++++++++++++++++++++++++++ app/db/CassandraResults.java | 19 +++++++++ 2 files changed, 100 insertions(+) diff --git a/app/db/CassandraClient.java b/app/db/CassandraClient.java index 94bfcde4..91c9cb95 100644 --- a/app/db/CassandraClient.java +++ b/app/db/CassandraClient.java @@ -124,6 +124,32 @@ public void addToSet(String table, String setName, this.session.execute(statement); } + public void addToMap(String table, String mapName, Map keyValues, + List predicates) { + String predicatesString = + predicates + .stream() + .map(predicate -> predicate.getField() + " = ?") + .collect(Collectors.joining(" and ")); + String whereString = " where " + predicatesString + ";"; + String query = "UPDATE " + table + " SET " + mapName + " = " + mapName + " + " + " ? " + whereString; + + BoundStatement statement = this.prepareStatement(query); + + // Cannot use normal setValue method for collections: https://datastax-oss.atlassian.net/browse/JAVA-185 + statement.bind(keyValues); + + int index = 1; + for (DbDataContainer container : predicates) { + CassandraClient.setValue(statement, container.getValue(), container.getGroundType(), index); + + index++; + } + + LOGGER.info("Executing update: " + statement.preparedStatement().getQueryString() + "."); + this.session.execute(statement); + } + /** * Retrieve rows based on a set of predicates. * @@ -240,6 +266,61 @@ private BoundStatement bind(String statement, List... predicate return boundStatement; } + /** + * Deletes a column from a table (sets column to null) + * @param predicates the predicates used to match row(s) to delete from + * @param table the table to delete from + * @param columnName the column to delete + */ + public void deleteColumn(List predicates, String table, String columnName) { + String deleteString = "delete " + columnName + " from " + table + " "; + + String predicateString = predicates.stream().map(predicate -> predicate.getField() + " = ? ") + .collect(Collectors.joining(" and ")); + + deleteString += "where " + predicateString; + + BoundStatement statement = this.prepareStatement(deleteString); + + int index = 0; + for (DbDataContainer predicate : predicates) { + CassandraClient.setValue(statement, predicate.getValue(), predicate.getGroundType(), index); + index++; + } + + this.session.execute(statement); + } + + /** + * Deletes all keys in the associated map by value + * @param predicates the predicates to match by row + * @param table the table to be deleted from + * @param columnName the column name of the map + * @param values the values in the map to be deleted + */ + public void deleteFromMapByValue(List predicates, String table, + String columnName, List values) { + String deleteString = "delete " + columnName + "[?] from " + table; + String predicateString = predicates.stream().map(predicate -> predicate.getField() + " = ? ") + .collect(Collectors.joining(" and ")); + deleteString += " where " + predicateString; + + for (DbDataContainer val : values) { + BoundStatement statement = this.prepareStatement(deleteString); + + // Set the value to be deleted + CassandraClient.setValue(statement, val.getValue(), val.getGroundType(), 0); + + int index = 1; + for (DbDataContainer predicate : predicates) { + CassandraClient.setValue(statement, predicate.getValue(), predicate.getGroundType(), index); + index++; + } + + this.session.execute(statement); + } + } + @Override public void commit() {} diff --git a/app/db/CassandraResults.java b/app/db/CassandraResults.java index 97290a75..3deccc89 100644 --- a/app/db/CassandraResults.java +++ b/app/db/CassandraResults.java @@ -21,6 +21,9 @@ import exceptions.GroundDbException; import models.versions.GroundType; +import java.util.Map; +import java.util.Set; + public class CassandraResults { private final ResultSet resultSet; private Row currentRow; @@ -92,6 +95,22 @@ public long getLong(String field) throws GroundDbException { } } + public Map getMap(String field, Class keyKlass, Class valueKlass) throws GroundDbException { + try { + return this.currentRow.getMap(field, keyKlass, valueKlass); + } catch (Exception e) { + throw new GroundDbException(e); + } + } + + public Set getSet(String field, Class klass) throws GroundDbException { + try { + return this.currentRow.getSet(field, klass); + } catch (Exception e) { + throw new GroundDbException(e); + } + } + /** * Move on to the next column in the result set. * From 3679d11fdd1e6fac5777ab5e91ff6f1470cf5b9c Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Fri, 14 Apr 2017 12:57:37 -0700 Subject: [PATCH 07/14] Encapsulate Cassandra structure_version_attribute Move table structure_version_attribute into map column of structure_version --- .../CassandraStructureVersionFactory.java | 32 +++++++------------ .../CassandraVersionHistoryDagFactory.java | 4 +-- scripts/cassandra/cassandra.cql | 10 ++---- scripts/cassandra/drop_cassandra.cql | 1 - 4 files changed, 15 insertions(+), 32 deletions(-) diff --git a/app/dao/models/cassandra/CassandraStructureVersionFactory.java b/app/dao/models/cassandra/CassandraStructureVersionFactory.java index d56c8a35..64ac6713 100644 --- a/app/dao/models/cassandra/CassandraStructureVersionFactory.java +++ b/app/dao/models/cassandra/CassandraStructureVersionFactory.java @@ -30,9 +30,6 @@ import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class CassandraStructureVersionFactory extends CassandraVersionFactory implements StructureVersionFactory { @@ -88,13 +85,11 @@ public StructureVersion create(long structureId, this.dbClient.insert("structure_version", insertions); for (String key : attributes.keySet()) { - List itemInsertions = new ArrayList<>(); - itemInsertions.add(new DbDataContainer("structure_version_id", GroundType.LONG, id)); - itemInsertions.add(new DbDataContainer("key", GroundType.STRING, key)); - itemInsertions.add(new DbDataContainer("type", GroundType.STRING, - attributes.get(key).toString())); - - this.dbClient.insert("structure_version_attribute", itemInsertions); + Map map = new HashMap<>(); + map.put(key, attributes.get(key).toString()); + List predicate = new ArrayList<>(); + predicate.add(new DbDataContainer("id", GroundType.LONG, id)); + this.dbClient.addToMap("structure_version", "key_type_map", map, predicate); } this.structureFactory.update(structureId, id, parentIds); @@ -120,21 +115,16 @@ public StructureVersion retrieveFromDatabase(long id) throws GroundException { predicates); super.verifyResultSet(resultSet, id); - Map attributes = new HashMap<>(); - - List attributePredicates = new ArrayList<>(); - attributePredicates.add(new DbDataContainer("structure_version_id", GroundType.LONG, id)); - CassandraResults attributesSet = this.dbClient.equalitySelect("structure_version_attribute", - DbClient.SELECT_STAR, attributePredicates); + Map tmpAttributes = resultSet.getMap("key_type_map", String.class, String.class); - if (attributesSet.isEmpty()) { + if (tmpAttributes.isEmpty()) { throw new GroundException("No StructureVersion attributes found for id " + id + "."); } - do { - attributes.put(attributesSet.getString("key"), GroundType.fromString(attributesSet - .getString("type"))); - } while (attributesSet.next()); + Map attributes = new HashMap<>(); + for (String key : tmpAttributes.keySet()) { + attributes.put(key, GroundType.fromString(tmpAttributes.get(key))); + } long structureId = resultSet.getLong("structure_id"); diff --git a/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java b/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java index 53144329..2f20ea8c 100644 --- a/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java +++ b/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java @@ -143,8 +143,8 @@ public void truncate(VersionHistoryDag dag, int numLevels, Class tableNamePrefix = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, tableNamePrefix); if (itemType.equals(Structure.class)) { - predicates.add(new DbDataContainer("structure_version_id", GroundType.LONG, id)); - this.dbClient.delete(predicates, "structure_version_attribute"); + predicates.add(new DbDataContainer("id", GroundType.LONG, id)); + this.dbClient.deleteColumn(predicates, "structure_version", "key_type_map"); predicates.clear(); } diff --git a/scripts/cassandra/cassandra.cql b/scripts/cassandra/cassandra.cql index 3b0b80ed..56a14a93 100644 --- a/scripts/cassandra/cassandra.cql +++ b/scripts/cassandra/cassandra.cql @@ -49,14 +49,8 @@ CREATE TABLE IF NOT EXISTS structure ( CREATE TABLE IF NOT EXISTS structure_version ( id bigint PRIMARY KEY, - structure_id bigint -); - -CREATE TABLE IF NOT EXISTS structure_version_attribute ( - structure_version_id bigint, - key varchar, - type varchar, - PRIMARY KEY (structure_version_id, key) + structure_id bigint, + key_type_map map ); CREATE TABLE IF NOT EXISTS rich_version ( diff --git a/scripts/cassandra/drop_cassandra.cql b/scripts/cassandra/drop_cassandra.cql index 0a3ab660..63aa2e8d 100644 --- a/scripts/cassandra/drop_cassandra.cql +++ b/scripts/cassandra/drop_cassandra.cql @@ -28,7 +28,6 @@ drop table edge; drop table rich_version_tag; drop table rich_version_external_parameter; drop table rich_version; -drop table structure_version_attribute; drop table structure_version; drop table structure; drop table item_tag; From 02f494fc9e4bf60967246e98059ffa5f88849835 Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Fri, 14 Apr 2017 12:59:05 -0700 Subject: [PATCH 08/14] Encapsulate Cassandra graph_version_edge Move graph_version_edge into set column of graph_version (edge_version_set) --- .../CassandraGraphVersionFactory.java | 31 +++++-------------- .../CassandraVersionHistoryDagFactory.java | 4 +-- scripts/cassandra/cassandra.cql | 9 ++---- scripts/cassandra/drop_cassandra.cql | 1 - scripts/cassandra/truncate.cql | 1 - 5 files changed, 12 insertions(+), 34 deletions(-) diff --git a/app/dao/models/cassandra/CassandraGraphVersionFactory.java b/app/dao/models/cassandra/CassandraGraphVersionFactory.java index c5862bf3..13b7013e 100644 --- a/app/dao/models/cassandra/CassandraGraphVersionFactory.java +++ b/app/dao/models/cassandra/CassandraGraphVersionFactory.java @@ -28,13 +28,12 @@ import util.IdGenerator; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class CassandraGraphVersionFactory extends CassandraRichVersionFactory implements GraphVersionFactory { @@ -97,12 +96,12 @@ public GraphVersion create(Map tags, this.dbClient.insert("graph_version", insertions); + List predicate = new ArrayList<>(); + predicate.add(new DbDataContainer("id", GroundType.LONG, id)); for (long edgeVersionId : edgeVersionIds) { - List edgeInsertion = new ArrayList<>(); - edgeInsertion.add(new DbDataContainer("graph_version_id", GroundType.LONG, id)); - edgeInsertion.add(new DbDataContainer("edge_version_id", GroundType.LONG, edgeVersionId)); - - this.dbClient.insert("graph_version_edge", edgeInsertion); + Set edge = new HashSet(); + edge.add(edgeVersionId); + this.dbClient.addToSet("graph_version", "edge_version_set", edge, predicate); } this.graphFactory.update(graphId, id, parentIds); @@ -126,26 +125,12 @@ public GraphVersion retrieveFromDatabase(long id) throws GroundException { List predicates = new ArrayList<>(); predicates.add(new DbDataContainer("id", GroundType.LONG, id)); - List edgePredicate = new ArrayList<>(); - edgePredicate.add(new DbDataContainer("graph_version_id", GroundType.LONG, id)); - CassandraResults resultSet = this.dbClient.equalitySelect("graph_version", DbClient.SELECT_STAR, predicates); - long graphId = resultSet.getLong("graph_id"); - - List edgeVersionIds = new ArrayList<>(); - CassandraResults edgeSet = this.dbClient.equalitySelect("graph_version_edge", - DbClient.SELECT_STAR, - edgePredicate); - - if (!edgeSet.isEmpty()) { - do { - edgeVersionIds.add(edgeSet.getLong("edge_version_id")); - } while (edgeSet.next()); - } + List edgeVersionIds = resultSet.getSet("edge_version_set", Long.class).stream().collect(Collectors.toList()); LOGGER.info("Retrieved graph version " + id + " in graph " + graphId + "."); return new GraphVersion(id, version.getTags(), version.getStructureVersionId(), diff --git a/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java b/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java index 2f20ea8c..3d3c4b43 100644 --- a/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java +++ b/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java @@ -149,8 +149,8 @@ public void truncate(VersionHistoryDag dag, int numLevels, Class } if (itemType.getName().toLowerCase().contains("graph")) { - predicates.add(new DbDataContainer(tableNamePrefix + "_version_id", GroundType.LONG, id)); - this.dbClient.delete(predicates, tableNamePrefix + "_version_edge"); + predicates.add(new DbDataContainer("id", GroundType.LONG, id)); + this.dbClient.deleteColumn(predicates, "graph_version", "edge_version_set"); predicates.clear(); } diff --git a/scripts/cassandra/cassandra.cql b/scripts/cassandra/cassandra.cql index 56a14a93..1e6d6185 100644 --- a/scripts/cassandra/cassandra.cql +++ b/scripts/cassandra/cassandra.cql @@ -110,13 +110,8 @@ CREATE TABLE IF NOT EXISTS edge_version ( CREATE TABLE IF NOT EXISTS graph_version ( id bigint PRIMARY KEY, - graph_id bigint -); - -CREATE TABLE IF NOT EXISTS graph_version_edge ( - graph_version_id bigint, - edge_version_id bigint, - PRIMARY KEY (graph_version_id, edge_version_id) + graph_id bigint, + edge_version_set set ); -- USAGE diff --git a/scripts/cassandra/drop_cassandra.cql b/scripts/cassandra/drop_cassandra.cql index 63aa2e8d..044511dc 100644 --- a/scripts/cassandra/drop_cassandra.cql +++ b/scripts/cassandra/drop_cassandra.cql @@ -18,7 +18,6 @@ drop table lineage_graph; drop table lineage_edge_version; drop table lineage_edge; drop table principal; -drop table graph_version_edge; drop table graph_version; drop table edge_version; drop table node_version; diff --git a/scripts/cassandra/truncate.cql b/scripts/cassandra/truncate.cql index 4c175ce1..8ba4047d 100644 --- a/scripts/cassandra/truncate.cql +++ b/scripts/cassandra/truncate.cql @@ -11,7 +11,6 @@ -- See the License for the specific language governing permissions and -- limitations under the License. - TRUNCATE lineage_graph_version_edge; TRUNCATE lineage_graph_version; TRUNCATE lineage_graph; From b3b6bd7cf049b3f4407cabcdab8990ede4ca04db Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Fri, 14 Apr 2017 13:21:48 -0700 Subject: [PATCH 09/14] Encapsulate Cassandra lineage_graph_version_edge Move lineage_graph_version_edge into lineage_graph_version (with set lineage_edge_version_id_set) --- .../CassandraLineageGraphVersionFactory.java | 31 +++++++------------ scripts/cassandra/cassandra.cql | 8 ++--- scripts/cassandra/drop_cassandra.cql | 1 - scripts/cassandra/truncate.cql | 3 -- 4 files changed, 13 insertions(+), 30 deletions(-) diff --git a/app/dao/usage/cassandra/CassandraLineageGraphVersionFactory.java b/app/dao/usage/cassandra/CassandraLineageGraphVersionFactory.java index 53a51a56..263a36a7 100644 --- a/app/dao/usage/cassandra/CassandraLineageGraphVersionFactory.java +++ b/app/dao/usage/cassandra/CassandraLineageGraphVersionFactory.java @@ -31,11 +31,11 @@ import util.IdGenerator; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Set; +import java.util.stream.Collectors; public class CassandraLineageGraphVersionFactory extends CassandraRichVersionFactory @@ -104,13 +104,12 @@ public LineageGraphVersion create(Map tags, this.dbClient.insert("lineage_graph_version", insertions); for (long lineageEdgeVersionId : lineageEdgeVersionIds) { - List lineageEdgeInsertion = new ArrayList<>(); - lineageEdgeInsertion.add(new DbDataContainer("lineage_graph_version_id", GroundType.LONG, - id)); - lineageEdgeInsertion.add(new DbDataContainer("lineage_edge_version_id", GroundType.LONG, - lineageEdgeVersionId)); + List predicates = new ArrayList<>(); + predicates.add(new DbDataContainer("id", GroundType.LONG, id)); + Set edgeValue = new HashSet<>(); + edgeValue.add(lineageEdgeVersionId); - this.dbClient.insert("lineage_graph_version_edge", lineageEdgeInsertion); + this.dbClient.addToSet("lineage_graph_version", "lineage_edge_version_id_set", edgeValue, predicates); } this.lineageGraphFactory.update(lineageGraphId, id, parentIds); @@ -137,8 +136,8 @@ public LineageGraphVersion retrieveFromDatabase(long id) throws GroundException List predicates = new ArrayList<>(); predicates.add(new DbDataContainer("id", GroundType.LONG, id)); - List lineageEdgePredicate = new ArrayList<>(); - lineageEdgePredicate.add(new DbDataContainer("lineage_graph_version_id", GroundType.LONG, + List lineageGraphVersionPredicates = new ArrayList<>(); + lineageGraphVersionPredicates.add(new DbDataContainer("id", GroundType.LONG, id)); CassandraResults resultSet = this.dbClient.equalitySelect("lineage_graph_version", @@ -148,15 +147,7 @@ public LineageGraphVersion retrieveFromDatabase(long id) throws GroundException long lineageGraphId = resultSet.getLong("lineage_graph_id"); - List lineageEdgeVersionIds = new ArrayList<>(); - CassandraResults lineageEdgeSet = this.dbClient.equalitySelect("lineage_graph_version_edge", - DbClient.SELECT_STAR, lineageEdgePredicate); - - if (!lineageEdgeSet.isEmpty()) { - do { - lineageEdgeVersionIds.add(lineageEdgeSet.getLong("lineage_edge_version_id")); - } while (lineageEdgeSet.next()); - } + List lineageEdgeVersionIds = resultSet.getSet("lineage_edge_version_id_set", Long.class).stream().collect(Collectors.toList()); LOGGER.info("Retrieved lineage_graph version " + id + " in lineage_graph " + lineageGraphId + "."); diff --git a/scripts/cassandra/cassandra.cql b/scripts/cassandra/cassandra.cql index 1e6d6185..18bf5839 100644 --- a/scripts/cassandra/cassandra.cql +++ b/scripts/cassandra/cassandra.cql @@ -147,14 +147,10 @@ CREATE TABLE IF NOT EXISTS lineage_graph ( CREATE TABLE IF NOT EXISTS lineage_graph_version ( id bigint PRIMARY KEY, - lineage_graph_id bigint + lineage_graph_id bigint, + lineage_edge_version_id_set set ); -CREATE TABLE IF NOT EXISTS lineage_graph_version_edge ( - lineage_graph_version_id bigint, - lineage_edge_version_id bigint, - PRIMARY KEY (lineage_graph_version_id, lineage_edge_version_id) -); -- CREATE EMPTY VERSION diff --git a/scripts/cassandra/drop_cassandra.cql b/scripts/cassandra/drop_cassandra.cql index 044511dc..ac89eb9b 100644 --- a/scripts/cassandra/drop_cassandra.cql +++ b/scripts/cassandra/drop_cassandra.cql @@ -12,7 +12,6 @@ * limitations under the License. */ -drop table lineage_graph_version_edge; drop table lineage_graph_version; drop table lineage_graph; drop table lineage_edge_version; diff --git a/scripts/cassandra/truncate.cql b/scripts/cassandra/truncate.cql index 8ba4047d..889b2401 100644 --- a/scripts/cassandra/truncate.cql +++ b/scripts/cassandra/truncate.cql @@ -11,13 +11,11 @@ -- See the License for the specific language governing permissions and -- limitations under the License. -TRUNCATE lineage_graph_version_edge; TRUNCATE lineage_graph_version; TRUNCATE lineage_graph; TRUNCATE lineage_edge_version; TRUNCATE lineage_edge; TRUNCATE principal; -TRUNCATE graph_version_edge; TRUNCATE graph_version; TRUNCATE edge_version; TRUNCATE node_version; @@ -27,7 +25,6 @@ TRUNCATE edge; TRUNCATE rich_version_tag; TRUNCATE rich_version_external_parameter; TRUNCATE rich_version; -TRUNCATE structure_version_attribute; TRUNCATE structure_version; TRUNCATE structure; TRUNCATE version_history_dag; From 7c50b4a880cf8710ce942255cfc07c97cddbdb85 Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Thu, 20 Apr 2017 15:55:22 -0700 Subject: [PATCH 10/14] Add method to delete from Cassandra set --- app/db/CassandraClient.java | 76 ++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 22 deletions(-) diff --git a/app/db/CassandraClient.java b/app/db/CassandraClient.java index 91c9cb95..6bf424ef 100644 --- a/app/db/CassandraClient.java +++ b/app/db/CassandraClient.java @@ -100,28 +100,7 @@ public void insert(String table, List insertValues) { */ public void addToSet(String table, String setName, Set values, List predicates) { - String predicatesString = - predicates - .stream() - .map(predicate -> predicate.getField() + " = ?") - .collect(Collectors.joining(" and ")); - String whereString = " where " + predicatesString + ";"; - String query = "UPDATE " + table + " SET " + setName + " = " + setName + " + " + " ? " + whereString; - - BoundStatement statement = this.prepareStatement(query); - - // Cannot use normal setValue method for collections: https://datastax-oss.atlassian.net/browse/JAVA-185 - statement.bind(values); - - int index = 1; - for (DbDataContainer container : predicates) { - CassandraClient.setValue(statement, container.getValue(), container.getGroundType(), index); - - index++; - } - - LOGGER.info("Executing update: " + statement.preparedStatement().getQueryString() + "."); - this.session.execute(statement); + this.modifySet(table, setName, values, predicates, true); } public void addToMap(String table, String mapName, Map keyValues, @@ -266,6 +245,18 @@ private BoundStatement bind(String statement, List... predicate return boundStatement; } + /** + * Deletes values from a set + * @param table the table to update + * @param setName the name of the set (column) being updated + * @param values a set containing all values to be deleted. Set type must match with column type + * @param predicates values specifying which row will be updated + */ + public void deleteFromSet(String table, String setName, + Set values, List predicates) { + this.modifySet(table, setName, values, predicates, false); + } + /** * Deletes a column from a table (sets column to null) * @param predicates the predicates used to match row(s) to delete from @@ -333,6 +324,47 @@ public void close() { this.cluster.close(); } + /** + * Adds or subtracts the specified elements from a set + * @param table the table to update + * @param setName the name of the set (column) being added to or subtracted from + * @param values a set containing all relevant values. Set type must match with column type + * @param predicates values specifying which row will be updated + * @param add true if adding false if subtracting + */ + private void modifySet(String table, String setName, + Set values, List predicates, boolean add) { + String symbol; + if (add) { + symbol = " + "; + } else { + symbol = " - "; + } + + String predicatesString = + predicates + .stream() + .map(predicate -> predicate.getField() + " = ?") + .collect(Collectors.joining(" and ")); + String whereString = " where " + predicatesString + ";"; + String query = "UPDATE " + table + " SET " + setName + " = " + setName + symbol + " ? " + whereString; + + BoundStatement statement = this.prepareStatement(query); + + // Cannot use normal setValue method for collections: https://datastax-oss.atlassian.net/browse/JAVA-185 + statement.bind(values); + + int index = 1; + for (DbDataContainer container : predicates) { + CassandraClient.setValue(statement, container.getValue(), container.getGroundType(), index); + + index++; + } + + LOGGER.info("Executing update: " + statement.preparedStatement().getQueryString() + "."); + this.session.execute(statement); + } + private BoundStatement prepareStatement(String sql) { // Use the cached statement if possible; otherwise, prepare a new statement. PreparedStatement statement = From 812eecd9521c3abf8abb51f7bbd92a71b50f78af Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Thu, 20 Apr 2017 16:57:30 -0700 Subject: [PATCH 11/14] Fix Style and make tests pass --- .../models/cassandra/CassandraGraphVersionFactory.java | 7 ++++--- .../cassandra/CassandraLineageGraphVersionFactory.java | 5 +++-- .../cassandra/CassandraVersionSuccessorFactory.java | 9 ++++++--- app/db/CassandraClient.java | 4 ++-- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/app/dao/models/cassandra/CassandraGraphVersionFactory.java b/app/dao/models/cassandra/CassandraGraphVersionFactory.java index 13b7013e..d2b3fdbd 100644 --- a/app/dao/models/cassandra/CassandraGraphVersionFactory.java +++ b/app/dao/models/cassandra/CassandraGraphVersionFactory.java @@ -32,7 +32,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CassandraGraphVersionFactory extends CassandraRichVersionFactory @@ -99,7 +100,7 @@ public GraphVersion create(Map tags, List predicate = new ArrayList<>(); predicate.add(new DbDataContainer("id", GroundType.LONG, id)); for (long edgeVersionId : edgeVersionIds) { - Set edge = new HashSet(); + Set edge = new HashSet<>(); edge.add(edgeVersionId); this.dbClient.addToSet("graph_version", "edge_version_set", edge, predicate); } @@ -130,7 +131,7 @@ public GraphVersion retrieveFromDatabase(long id) throws GroundException { predicates); long graphId = resultSet.getLong("graph_id"); - List edgeVersionIds = resultSet.getSet("edge_version_set", Long.class).stream().collect(Collectors.toList()); + List edgeVersionIds = new ArrayList<>(resultSet.getSet("edge_version_set", Long.class)); LOGGER.info("Retrieved graph version " + id + " in graph " + graphId + "."); return new GraphVersion(id, version.getTags(), version.getStructureVersionId(), diff --git a/app/dao/usage/cassandra/CassandraLineageGraphVersionFactory.java b/app/dao/usage/cassandra/CassandraLineageGraphVersionFactory.java index 263a36a7..29d2d284 100644 --- a/app/dao/usage/cassandra/CassandraLineageGraphVersionFactory.java +++ b/app/dao/usage/cassandra/CassandraLineageGraphVersionFactory.java @@ -35,7 +35,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CassandraLineageGraphVersionFactory extends CassandraRichVersionFactory @@ -147,7 +148,7 @@ public LineageGraphVersion retrieveFromDatabase(long id) throws GroundException long lineageGraphId = resultSet.getLong("lineage_graph_id"); - List lineageEdgeVersionIds = resultSet.getSet("lineage_edge_version_id_set", Long.class).stream().collect(Collectors.toList()); + List lineageEdgeVersionIds = new ArrayList<>(resultSet.getSet("lineage_edge_version_id_set", Long.class)); LOGGER.info("Retrieved lineage_graph version " + id + " in lineage_graph " + lineageGraphId + "."); diff --git a/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java b/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java index f0c04d5f..dff65ed3 100644 --- a/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java +++ b/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java @@ -26,7 +26,9 @@ import util.IdGenerator; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class CassandraVersionSuccessorFactory implements VersionSuccessorFactory { private final CassandraClient dbClient; @@ -141,10 +143,11 @@ public void deleteFromDestination(long toId, long itemId) throws GroundException long dbId = resultSet.getLong("id"); predicates.clear(); - predicates.add(new DbDataContainer("item_id", GroundType.LONG, itemId)); - predicates.add(new DbDataContainer("version_successor_id", GroundType.LONG, dbId)); + predicates.add(new DbDataContainer("id", GroundType.LONG, dbId)); + Set value = new HashSet<>(); + value.add(itemId); - this.dbClient.delete(predicates, "version_history_dag"); + this.dbClient.deleteFromSet("version_successor", "item_id_set", value, predicates); predicates.clear(); predicates.add(new DbDataContainer("id", GroundType.LONG, dbId)); diff --git a/app/db/CassandraClient.java b/app/db/CassandraClient.java index 6bf424ef..042956bf 100644 --- a/app/db/CassandraClient.java +++ b/app/db/CassandraClient.java @@ -91,7 +91,7 @@ public void insert(String table, List insertValues) { this.session.execute(statement); } - /** + /** * * @param table the table to update * @param setName the name of the set (column) being appended to @@ -162,7 +162,7 @@ public CassandraResults equalitySelect(String table, return new CassandraResults(resultSet); } - /** + /** * * @param table the table to query * @param projection the set of columns to retrieve From 1a0accedc9b61b70d58337aa89cd98ec25a46692 Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Tue, 2 May 2017 13:50:56 -0700 Subject: [PATCH 12/14] Fix CassandraClient to pass upstream changes --- .../CassandraStructureVersionFactory.java | 3 + app/db/CassandraClient.java | 84 +++++-------------- scripts/cassandra/cassandra.cql | 2 +- scripts/cassandra/truncate.cql | 1 - 4 files changed, 27 insertions(+), 63 deletions(-) diff --git a/app/dao/models/cassandra/CassandraStructureVersionFactory.java b/app/dao/models/cassandra/CassandraStructureVersionFactory.java index 64ac6713..eaee941a 100644 --- a/app/dao/models/cassandra/CassandraStructureVersionFactory.java +++ b/app/dao/models/cassandra/CassandraStructureVersionFactory.java @@ -30,6 +30,9 @@ import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class CassandraStructureVersionFactory extends CassandraVersionFactory implements StructureVersionFactory { diff --git a/app/db/CassandraClient.java b/app/db/CassandraClient.java index 042956bf..099657ca 100644 --- a/app/db/CassandraClient.java +++ b/app/db/CassandraClient.java @@ -20,7 +20,7 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; - +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -29,6 +29,8 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CassandraClient extends DbClient { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClient.class); @@ -83,7 +85,6 @@ public void insert(String table, List insertValues) { if (table.equals("edge") || table.equals("node") || table.equals("graph") || table.equals("structure")) { insert = insert.substring(0, insert.length() - 1) + "IF NOT EXISTS;"; } - BoundStatement statement = this.prepareStatement(insert); BoundStatement statement = bind(insert, insertValues); @@ -115,15 +116,12 @@ public void addToMap(String table, String mapName, Map values = new ArrayList<>(); + values.add(keyValues); + for (DbDataContainer dataContainer : predicates) { + values.add(dataContainer.getValue()); } + statement.bind(values.toArray(new Object[values.size()])); LOGGER.info("Executing update: " + statement.preparedStatement().getQueryString() + "."); this.session.execute(statement); @@ -174,8 +172,9 @@ public CassandraResults selectWhereCollectionContains( String query = "SELECT " + String.join(", ", projection) + " FROM " + table + " WHERE " + collectionName + " CONTAINS ?;"; - BoundStatement statement = this.prepareStatement(query); - CassandraClient.setValue(statement, value.getValue(), value.getGroundType(), 0); + List predicates = new ArrayList<>(); + predicates.add(value); + BoundStatement statement = bind(query, predicates); LOGGER.info("Executing query: " + statement.preparedStatement().getQueryString() + "."); ResultSet resultSet = this.session.execute(statement); @@ -237,14 +236,6 @@ public void delete(List predicates, String table) { this.session.execute(statement); } - private BoundStatement bind(String statement, List... predicates) { - BoundStatement boundStatement = this.prepareStatement(statement); - List values = Arrays.stream(predicates).flatMap(Collection::stream) - .map(p-> p.getValue()).collect(Collectors.toList()); - boundStatement.bind(values.toArray(new Object[values.size()])); - return boundStatement; - } - /** * Deletes values from a set * @param table the table to update @@ -271,45 +262,19 @@ public void deleteColumn(List predicates, String table, String deleteString += "where " + predicateString; - BoundStatement statement = this.prepareStatement(deleteString); + BoundStatement statement = bind(deleteString, predicates); - int index = 0; - for (DbDataContainer predicate : predicates) { - CassandraClient.setValue(statement, predicate.getValue(), predicate.getGroundType(), index); - index++; - } this.session.execute(statement); } - /** - * Deletes all keys in the associated map by value - * @param predicates the predicates to match by row - * @param table the table to be deleted from - * @param columnName the column name of the map - * @param values the values in the map to be deleted - */ - public void deleteFromMapByValue(List predicates, String table, - String columnName, List values) { - String deleteString = "delete " + columnName + "[?] from " + table; - String predicateString = predicates.stream().map(predicate -> predicate.getField() + " = ? ") - .collect(Collectors.joining(" and ")); - deleteString += " where " + predicateString; - - for (DbDataContainer val : values) { - BoundStatement statement = this.prepareStatement(deleteString); - // Set the value to be deleted - CassandraClient.setValue(statement, val.getValue(), val.getGroundType(), 0); - - int index = 1; - for (DbDataContainer predicate : predicates) { - CassandraClient.setValue(statement, predicate.getValue(), predicate.getGroundType(), index); - index++; - } - - this.session.execute(statement); - } + private BoundStatement bind(String statement, List... predicates) { + BoundStatement boundStatement = this.prepareStatement(statement); + List values = Arrays.stream(predicates).flatMap(Collection::stream) + .map(p-> p.getValue()).collect(Collectors.toList()); + boundStatement.bind(values.toArray(new Object[values.size()])); + return boundStatement; } @Override @@ -351,15 +316,12 @@ private void modifySet(String table, String setName, BoundStatement statement = this.prepareStatement(query); - // Cannot use normal setValue method for collections: https://datastax-oss.atlassian.net/browse/JAVA-185 - statement.bind(values); - - int index = 1; - for (DbDataContainer container : predicates) { - CassandraClient.setValue(statement, container.getValue(), container.getGroundType(), index); - - index++; + List valuesToAdd = new ArrayList<>(); + valuesToAdd.add(values); + for (DbDataContainer dataContainer : predicates) { + valuesToAdd.add(dataContainer.getValue()); } + statement.bind(valuesToAdd.toArray(new Object[valuesToAdd.size()])); LOGGER.info("Executing update: " + statement.preparedStatement().getQueryString() + "."); this.session.execute(statement); diff --git a/scripts/cassandra/cassandra.cql b/scripts/cassandra/cassandra.cql index 18bf5839..f566e024 100644 --- a/scripts/cassandra/cassandra.cql +++ b/scripts/cassandra/cassandra.cql @@ -25,7 +25,7 @@ CREATE TABLE IF NOT EXISTS version_successor ( item_id_set set ); -create index version_successor_ind on version_successor (item_id_set); +CREATE INDEX IF NOT EXISTS version_successor_ind on version_successor (item_id_set); CREATE TABLE IF NOT EXISTS item ( id bigint PRIMARY KEY diff --git a/scripts/cassandra/truncate.cql b/scripts/cassandra/truncate.cql index 889b2401..a1d2972d 100644 --- a/scripts/cassandra/truncate.cql +++ b/scripts/cassandra/truncate.cql @@ -27,7 +27,6 @@ TRUNCATE rich_version_external_parameter; TRUNCATE rich_version; TRUNCATE structure_version; TRUNCATE structure; -TRUNCATE version_history_dag; TRUNCATE item_tag; TRUNCATE item; TRUNCATE version_successor; From af28a05a610cdf9bf95e038dd57edd9fce80ec8b Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Tue, 2 May 2017 22:45:07 -0700 Subject: [PATCH 13/14] Modify Cassandra version_successor table change item_id_set to a singular item_id --- .../CassandraVersionHistoryDagFactory.java | 21 ++++++------------ .../CassandraVersionSuccessorFactory.java | 22 +++++-------------- scripts/cassandra/cassandra.cql | 4 +--- 3 files changed, 13 insertions(+), 34 deletions(-) diff --git a/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java b/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java index 3d3c4b43..696115b3 100644 --- a/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java +++ b/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java @@ -15,13 +15,14 @@ package dao.versions.cassandra; import com.google.common.base.CaseFormat; - import dao.versions.VersionHistoryDagFactory; import db.CassandraClient; -import db.CassandraResults; -import db.DbClient; import db.DbDataContainer; import exceptions.GroundException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import models.models.Structure; import models.versions.GroundType; import models.versions.Item; @@ -29,11 +30,6 @@ import models.versions.VersionHistoryDag; import models.versions.VersionSuccessor; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - public class CassandraVersionHistoryDagFactory implements VersionHistoryDagFactory { private final CassandraClient dbClient; private final CassandraVersionSuccessorFactory versionSuccessorFactory; @@ -87,14 +83,11 @@ public void addEdge(VersionHistoryDag dag, long parentId, long childId, long ite VersionSuccessor successor = this.versionSuccessorFactory.create(parentId, childId); // Adding to the entry with id = successor.getId() - List predicate = new ArrayList<>(); + List newValue = new ArrayList<>(), predicate = new ArrayList<>(); + newValue.add(new DbDataContainer("item_id", GroundType.LONG, itemId)); predicate.add(new DbDataContainer("id", GroundType.LONG, successor.getId())); - // To add to a set column, must pass in a set containing the value(s) to add. See CasandraClient.addToSet - Set setValues = new HashSet<>(); - setValues.add(itemId); - - this.dbClient.addToSet("version_successor", "item_id_set", setValues, predicate); + this.dbClient.update(newValue, predicate, "version_successor"); dag.addEdge(parentId, childId, successor.getId()); } diff --git a/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java b/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java index dff65ed3..dda0c5ec 100644 --- a/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java +++ b/app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java @@ -20,16 +20,13 @@ import db.DbClient; import db.DbDataContainer; import exceptions.GroundException; +import java.util.ArrayList; +import java.util.List; import models.versions.GroundType; import models.versions.Version; import models.versions.VersionSuccessor; import util.IdGenerator; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - public class CassandraVersionSuccessorFactory implements VersionSuccessorFactory { private final CassandraClient dbClient; private final IdGenerator idGenerator; @@ -73,11 +70,9 @@ public List> retrieveFromDatabaseByItemI CassandraResults resultSet; List> versionSuccessors = new ArrayList<>(); - resultSet = this.dbClient.selectWhereCollectionContains( - "version_successor", - DbClient.SELECT_STAR, - "item_id_set", - new DbDataContainer(null, GroundType.LONG, itemId)); + List predicate = new ArrayList<>(); + predicate.add(new DbDataContainer("item_id", GroundType.LONG, itemId)); + resultSet = this.dbClient.equalitySelect("version_successor", DbClient.SELECT_STAR, predicate); if (resultSet.isEmpty()) { throw new GroundException("No VersionSuccessor found with itemId " + itemId + "."); @@ -142,13 +137,6 @@ public void deleteFromDestination(long toId, long itemId) throws GroundException do { long dbId = resultSet.getLong("id"); - predicates.clear(); - predicates.add(new DbDataContainer("id", GroundType.LONG, dbId)); - Set value = new HashSet<>(); - value.add(itemId); - - this.dbClient.deleteFromSet("version_successor", "item_id_set", value, predicates); - predicates.clear(); predicates.add(new DbDataContainer("id", GroundType.LONG, dbId)); diff --git a/scripts/cassandra/cassandra.cql b/scripts/cassandra/cassandra.cql index f566e024..6b1048e7 100644 --- a/scripts/cassandra/cassandra.cql +++ b/scripts/cassandra/cassandra.cql @@ -22,11 +22,9 @@ CREATE TABLE IF NOT EXISTS version_successor ( id bigint PRIMARY KEY, from_version_id bigint, to_version_id bigint, - item_id_set set + item_id bigint ); -CREATE INDEX IF NOT EXISTS version_successor_ind on version_successor (item_id_set); - CREATE TABLE IF NOT EXISTS item ( id bigint PRIMARY KEY ); From a2ea3a8a9e275f98e45eb787e8390e068baf7692 Mon Sep 17 00:00:00 2001 From: Sean Lobo Date: Tue, 2 May 2017 23:31:53 -0700 Subject: [PATCH 14/14] Update CassandraClient.deleteColumn now accepts a list of columns to delete --- .../cassandra/CassandraVersionHistoryDagFactory.java | 8 ++++++-- app/db/CassandraClient.java | 10 ++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java b/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java index 696115b3..abf1abc7 100644 --- a/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java +++ b/app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java @@ -137,13 +137,17 @@ public void truncate(VersionHistoryDag dag, int numLevels, Class if (itemType.equals(Structure.class)) { predicates.add(new DbDataContainer("id", GroundType.LONG, id)); - this.dbClient.deleteColumn(predicates, "structure_version", "key_type_map"); + List columns = new ArrayList<>(); + columns.add("key_type_map"); + this.dbClient.deleteColumn(predicates, "structure_version", columns); predicates.clear(); } if (itemType.getName().toLowerCase().contains("graph")) { predicates.add(new DbDataContainer("id", GroundType.LONG, id)); - this.dbClient.deleteColumn(predicates, "graph_version", "edge_version_set"); + List columns = new ArrayList<>(); + columns.add("edge_version_set"); + this.dbClient.deleteColumn(predicates, "graph_version", columns); predicates.clear(); } diff --git a/app/db/CassandraClient.java b/app/db/CassandraClient.java index 099657ca..14f5d1f3 100644 --- a/app/db/CassandraClient.java +++ b/app/db/CassandraClient.java @@ -249,13 +249,15 @@ public void deleteFromSet(String table, String setName, } /** - * Deletes a column from a table (sets column to null) + * Deletes a column from a table * @param predicates the predicates used to match row(s) to delete from * @param table the table to delete from - * @param columnName the column to delete + * @param columnNames the columns to delete */ - public void deleteColumn(List predicates, String table, String columnName) { - String deleteString = "delete " + columnName + " from " + table + " "; + public void deleteColumn(List predicates, String table, List columnNames) { + String names = columnNames.stream().collect(Collectors.joining(", ")); + + String deleteString = "delete " + names + " from " + table + " "; String predicateString = predicates.stream().map(predicate -> predicate.getField() + " = ? ") .collect(Collectors.joining(" and "));