This patch adds the parsing for the "CREATE MATERIALIZED VIEW" statement,
following Cassandra 3 syntax. For example:
CREATE MATERIALIZED VIEW building_by_city
AS SELECT * FROM buildings
WHERE city IS NOT NULL
PRIMARY KEY(city, name);
It also adds the "IS NOT NULL" operator needed for this purpose.
Currently (and also in Cassandra), "IS NOT NULL" can only be used for
materialized view creation (not a normal SELECT), and can only be used
with the NULL operand (i.e., "IS NOT 3" will be a syntax error).
The current implementation of this statement just does some sanity
checking (such as to verify that "city" is a valid column name), and
then complains that materialized views are not yet supported:
SyntaxException: <ErrorMessage code=2000 [Syntax error in CQL query] message="Failed parsing statement: [CREATE MATERIALIZED VIEW building_by_city AS
SELECT * FROM buildings
WHERE city IS NOT NULL
PRIMARY KEY(city, name);] reason: unsupported operation: Materialized views not yet supported">
As mentioned above, the "IS NOT NULL" restriction is not allowed in
ordinary selects not creating a materialized views:
SELECT * FROM buildings WHERE city IS NOT NULL;
InvalidRequest: code=2200 [Invalid query] message="restriction 'city IS NOT null' is only supported in materialized view creation"
Signed-off-by: Nadav Har'El <
n...@scylladb.com>
---
configure.py | 1 +
cql3/operator.hh | 1 +
cql3/relation.hh | 4 +
cql3/restrictions/statement_restrictions.hh | 5 +-
cql3/single_column_relation.hh | 5 +
cql3/statements/create_view_statement.hh | 75 ++++
cql3/operator.cc | 1 +
cql3/restrictions/statement_restrictions.cc | 38 +-
cql3/statements/create_view_statement.cc | 638 ++++++++++++++++++++++++++++
cql3/Cql.g | 43 +-
10 files changed, 807 insertions(+), 4 deletions(-)
create mode 100644 cql3/statements/create_view_statement.hh
create mode 100644 cql3/statements/create_view_statement.cc
diff --git a/configure.py b/configure.py
index b54f72d..60edfe4 100755
--- a/configure.py
+++ b/configure.py
@@ -328,6 +328,7 @@ scylla_core = (['database.cc',
'cql3/statements/authentication_statement.cc',
'cql3/statements/create_keyspace_statement.cc',
'cql3/statements/create_table_statement.cc',
+ 'cql3/statements/create_view_statement.cc',
'cql3/statements/create_type_statement.cc',
'cql3/statements/create_user_statement.cc',
'cql3/statements/drop_keyspace_statement.cc',
diff --git a/cql3/operator.hh b/cql3/operator.hh
index a67b539..dca3419 100644
--- a/cql3/operator.hh
+++ b/cql3/operator.hh
@@ -58,6 +58,7 @@ class operator_type {
static const operator_type CONTAINS;
static const operator_type CONTAINS_KEY;
static const operator_type NEQ;
+ static const operator_type IS_NOT;
private:
int32_t _b;
const operator_type& _reverse;
diff --git a/cql3/relation.hh b/cql3/relation.hh
index 9cbda71..02b286c 100644
--- a/cql3/relation.hh
+++ b/cql3/relation.hh
@@ -156,6 +156,10 @@ class relation : public enable_shared_from_this<relation> {
return new_contains_restriction(db, schema, bound_names, false);
} else if (_relation_type == operator_type::CONTAINS_KEY) {
return new_contains_restriction(db, schema, bound_names, true);
+ } else if (_relation_type == operator_type::IS_NOT) {
+ // This case is not supposed to happen: statement_restrictions
+ // constructor does not call this function for views' IS_NOT.
+ throw exceptions::invalid_request_exception(sprint("Unsupported \"IS NOT\" relation: %s", to_string()));
} else {
throw exceptions::invalid_request_exception(sprint("Unsupported \"!=\" relation: %s", to_string()));
}
diff --git a/cql3/restrictions/statement_restrictions.hh b/cql3/restrictions/statement_restrictions.hh
index 447cfc6..6e3be68 100644
--- a/cql3/restrictions/statement_restrictions.hh
+++ b/cql3/restrictions/statement_restrictions.hh
@@ -83,6 +83,8 @@ class statement_restrictions {
*/
::shared_ptr<single_column_restrictions> _nonprimary_key_restrictions;
+ std::unordered_set<const column_definition*> _not_null_columns;
+
/**
* The restrictions used to build the index expressions
*/
@@ -112,7 +114,8 @@ class statement_restrictions {
const std::vector<::shared_ptr<relation>>& where_clause,
::shared_ptr<variable_specifications> bound_names,
bool selects_only_static_columns,
- bool select_a_collection);
+ bool select_a_collection,
+ bool for_view = false);
private:
void add_restriction(::shared_ptr<restriction> restriction);
void add_single_column_restriction(::shared_ptr<single_column_restriction> restriction);
diff --git a/cql3/single_column_relation.hh b/cql3/single_column_relation.hh
index 5077fcc..9c391e2 100644
--- a/cql3/single_column_relation.hh
+++ b/cql3/single_column_relation.hh
@@ -110,6 +110,11 @@ class single_column_relation final : public relation {
::shared_ptr<term::raw> get_map_key() {
return _map_key;
}
+
+ ::shared_ptr<term::raw> get_value() {
+ return _value;
+ }
+
protected:
virtual ::shared_ptr<term> to_term(const std::vector<::shared_ptr<column_specification>>& receivers,
::shared_ptr<term::raw> raw, database& db, const sstring& keyspace,
diff --git a/cql3/statements/create_view_statement.hh b/cql3/statements/create_view_statement.hh
new file mode 100644
index 0000000..4258b04
--- /dev/null
+++ b/cql3/statements/create_view_statement.hh
@@ -0,0 +1,75 @@
+/*
+ * This file is part of Scylla.
+ * Copyright (C) 2016 ScyllaDB
+ *
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "cql3/statements/schema_altering_statement.hh"
+#include "cql3/statements/cf_prop_defs.hh"
+#include "cql3/cql3_type.hh"
+#include "cql3/selection/raw_selector.hh"
+#include "cql3/relation.hh"
+#include "cql3/cf_name.hh"
+
+#include "service/migration_manager.hh"
+#include "schema.hh"
+
+#include "core/shared_ptr.hh"
+
+#include <utility>
+#include <vector>
+#include <experimental/optional>
+
+namespace cql3 {
+
+namespace statements {
+
+/** A <code>CREATE MATERIALIZED VIEW</code> parsed from a CQL query statement. */
+class create_view_statement : public schema_altering_statement {
+private:
+ ::shared_ptr<cf_name> _base_name;
+ std::vector<::shared_ptr<selection::raw_selector>> _select_clause;
+ std::vector<::shared_ptr<relation>> _where_clause;
+ std::vector<::shared_ptr<cql3::column_identifier::raw>> _partition_keys;
+ std::vector<::shared_ptr<cql3::column_identifier::raw>> _clustering_keys;
+ const ::shared_ptr<cf_prop_defs> _properties;
+ bool _if_not_exists;
+
+public:
+ create_view_statement(
+ ::shared_ptr<cf_name> view_name,
+ ::shared_ptr<cf_name> base_name,
+ std::vector<::shared_ptr<selection::raw_selector>> select_clause,
+ std::vector<::shared_ptr<relation>> where_clause,
+ std::vector<::shared_ptr<cql3::column_identifier::raw>> partition_keys,
+ std::vector<::shared_ptr<cql3::column_identifier::raw>> clustering_keys,
+ ::shared_ptr<cf_prop_defs> properties,
+ bool if_not_exists);
+
+ // Functions we need to override to subclass schema_altering_statement
+ virtual future<> check_access(const service::client_state& state) override;
+ virtual void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
+ virtual future<bool> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
+ virtual shared_ptr<transport::event::schema_change> change_event() override;
+ virtual shared_ptr<prepared> prepare(database& db) override;
+
+ // FIXME: continue here. See create_table_statement.hh and CreateViewStatement.java
+};
+
+}
+}
diff --git a/cql3/operator.cc b/cql3/operator.cc
index 9d61387..ddec3b6 100644
--- a/cql3/operator.cc
+++ b/cql3/operator.cc
@@ -52,5 +52,6 @@ const operator_type operator_type::IN(7, operator_type::IN, "IN");
const operator_type operator_type::CONTAINS(5, operator_type::CONTAINS, "CONTAINS");
const operator_type operator_type::CONTAINS_KEY(6, operator_type::CONTAINS_KEY, "CONTAINS_KEY");
const operator_type operator_type::NEQ(8, operator_type::NEQ, "!=");
+const operator_type operator_type::IS_NOT(9, operator_type::IS_NOT, "IS NOT");
}
diff --git a/cql3/restrictions/statement_restrictions.cc b/cql3/restrictions/statement_restrictions.cc
index a9c13c0..50f18d2 100644
--- a/cql3/restrictions/statement_restrictions.cc
+++ b/cql3/restrictions/statement_restrictions.cc
@@ -28,6 +28,9 @@
#include "single_column_primary_key_restrictions.hh"
#include "token_restriction.hh"
+#include "cql3/single_column_relation.hh"
+#include "cql3/constants.hh"
+
namespace cql3 {
namespace restrictions {
@@ -132,12 +135,20 @@ statement_restrictions::statement_restrictions(schema_ptr schema)
, _nonprimary_key_restrictions(::make_shared<single_column_restrictions>(schema))
{ }
+static const column_definition*
+to_column_definition(const schema_ptr& schema, const ::shared_ptr<column_identifier::raw>& entity) {
+ return get_column_definition(schema,
+ *entity->prepare_column_identifier(schema));
+}
+
+
statement_restrictions::statement_restrictions(database& db,
schema_ptr schema,
const std::vector<::shared_ptr<relation>>& where_clause,
::shared_ptr<variable_specifications> bound_names,
bool selects_only_static_columns,
- bool select_a_collection)
+ bool select_a_collection,
+ bool for_view)
: statement_restrictions(schema)
{
/*
@@ -149,7 +160,30 @@ statement_restrictions::statement_restrictions(database& db,
*/
if (!where_clause.empty()) {
for (auto&& relation : where_clause) {
- add_restriction(relation->to_restriction(db, schema, bound_names));
+ if (relation->get_operator() == cql3::operator_type::IS_NOT) {
+ single_column_relation* r =
+ dynamic_cast<single_column_relation*>(relation.get());
+ // The "IS NOT NULL" restriction is only supported (and
+ // mandatory) for materialized view creation:
+ if (!r) {
+ throw exceptions::invalid_request_exception("IS NOT only supports single column");
+ }
+ // currently, the grammar only allows the NULL argument to be
+ // "IS NOT", so this assertion should not be able to fail
+ assert(r->get_value() == cql3::constants::NULL_LITERAL);
+
+ auto col_id = r->get_entity()->prepare_column_identifier(schema);
+ const auto *cd = get_column_definition(schema, *col_id);
+ if (!cd) {
+ throw exceptions::invalid_request_exception(sprint("restriction '%s' unknown column %s", relation->to_string(), r->get_entity()->to_string()));
+ }
+ _not_null_columns.insert(cd);
+
+ if (!for_view)
+ throw exceptions::invalid_request_exception(sprint("restriction '%s' is only supported in materialized view creation", relation->to_string()));
+ } else {
+ add_restriction(relation->to_restriction(db, schema, bound_names));
+ }
}
}
diff --git a/cql3/statements/create_view_statement.cc b/cql3/statements/create_view_statement.cc
new file mode 100644
index 0000000..2923f16
--- /dev/null
+++ b/cql3/statements/create_view_statement.cc
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Copyright (C) 2016 ScyllaDB
+ *
+ * Modified by ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+
+#include <inttypes.h>
+#include <regex>
+
+#include <boost/range/adaptor/map.hpp>
+#include <boost/range/algorithm/adjacent_find.hpp>
+
+#include "cql3/statements/create_view_statement.hh"
+#include "cql3/statements/prepared_statement.hh"
+#include "schema_builder.hh"
+#include "service/storage_proxy.hh"
+
+
+namespace cql3 {
+
+namespace statements {
+
+create_view_statement::create_view_statement(
+ ::shared_ptr<cf_name> view_name,
+ ::shared_ptr<cf_name> base_name,
+ std::vector<::shared_ptr<selection::raw_selector>> select_clause,
+ std::vector<::shared_ptr<relation>> where_clause,
+ std::vector<::shared_ptr<cql3::column_identifier::raw>> partition_keys,
+ std::vector<::shared_ptr<cql3::column_identifier::raw>> clustering_keys,
+ ::shared_ptr<cf_prop_defs> properties,
+ bool if_not_exists)
+ : schema_altering_statement{view_name}
+ , _base_name{base_name}
+ , _select_clause{select_clause}
+ , _where_clause{where_clause}
+ , _partition_keys{partition_keys}
+ , _clustering_keys{clustering_keys}
+ , _properties{properties}
+ , _if_not_exists{if_not_exists}
+{
+ // TODO: why do we need this code? (we added this to create_table_statement constructor, why?)
+ if (!properties->has_property(cf_prop_defs::KW_COMPRESSION) && schema::DEFAULT_COMPRESSOR) {
+ std::map<sstring, sstring> compression = {
+ { sstring(compression_parameters::SSTABLE_COMPRESSION), schema::DEFAULT_COMPRESSOR.value() },
+ };
+ properties->add_property(cf_prop_defs::KW_COMPRESSION, compression);
+ }
+ // TODO: probably need to create a "statement_restrictions" like select does
+ // based on the select_clause, base_name and where_clause; However need to
+ // pass for_view=true.
+ throw exceptions::unsupported_operation_exception("Materialized views not yet supported");
+}
+
+// FIXME: I copied the following from create_table_statement. I don't know
+// what they do or whether they need to change for create view.
+future<> create_view_statement::check_access(const service::client_state& state) {
+ return state.has_keyspace_access(keyspace(), auth::permission::CREATE);
+}
+
+void create_view_statement::validate(distributed<service::storage_proxy>&, const service::client_state& state) {
+ // validated in announceMigration()
+}
+
+future<bool> create_view_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) {
+ // FIXME: this code from create_table_view is probably wrong, the Java CreateViewStatement.announceMigration is much more elaborate
+#if 0
+ ****** Our implementation in creat_table_statement (simpler code but that was simpler also in Java)
+ return make_ready_future<>().then([this, is_local_only] {
+ return service::get_local_migration_manager().announce_new_column_family(get_cf_meta_data(), is_local_only);
+ }).then_wrapped([this] (auto&& f) {
+ try {
+ f.get();
+ return true;
+ } catch (const exceptions::already_exists_exception& e) {
+ if (_if_not_exists) {
+ return false;
+ }
+ throw e;
+ }
+ });
+#endif
+#if 0
+ ***** This if 0 code is from Cassandra CreateViewStatement
+ // We need to make sure that:
+ // - primary key includes all columns in base table's primary key
+ // - make sure that the select statement does not have anything other than columns
+ // and their names match the base table's names
+ // - make sure that primary key does not include any collections
+ // - make sure there is no where clause in the select statement
+ // - make sure there is not currently a table or view
+ // - make sure baseTable gcGraceSeconds > 0
+
+ properties.validate();
+
+ if (properties.useCompactStorage)
+ throw new InvalidRequestException("Cannot use 'COMPACT STORAGE' when defining a materialized view");
+#endif
+
+ // View and base tables must be in the same keyspace, to ensure that RF
+ // is the same (because we assign a view replica to each base replica).
+ // If a keyspace was not specified for the base table name, it is assumed
+ // it is in the same keyspace as the view table being created (which
+ // itself might be the current USEd keyspace, or explicitly specified).
+ if (_base_name->get_keyspace().empty()) {
+ _base_name->set_keyspace(keyspace(), true);
+ }
+ if (_base_name->get_keyspace() != keyspace()) {
+ throw exceptions::invalid_request_exception(sprint(
+ "Cannot create a materialized view on a table in a separate keyspace ('%s' != '%s')",
+ _base_name->get_keyspace(), keyspace()));
+ }
+
+ // Validate that the keyspace and the base table exist, and is not a
+ // special table on which we cannot create views:
+ // CONTINUE HERE: something like the code below taken from migration_manager instead of the validateColumFamily below
+ auto& db = service::get_local_storage_proxy().get_db().local();
+ if (!db.has_keyspace(_base_name->get_keyspace())) {
+ throw exceptions::invalid_request_exception(sprint(
+ "Keyspace '%s' does not exist", _base_name->get_keyspace()));
+ }
+// auto& ks = db.find_keyspace(_base_name->get_keyspace());
+ if (!db.has_schema(_base_name->get_keyspace(), _base_name->get_column_family())) {
+ throw exceptions::invalid_request_exception(sprint(
+ "Base table '%s' does not exist",
+ _base_name->get_column_family()));
+ }
+ return make_ready_future<bool>(true);
+// if (db.has_schema(_base_name->get_keyspace(), _base_name->get_column_family())) {
+// throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
+// }
+#if 0
+ CFMetaData cfm = ThriftValidation.validateColumnFamily(baseName.getKeyspace(), baseName.getColumnFamily());
+
+ if (cfm.isCounter())
+ throw new InvalidRequestException("Materialized views are not supported on counter tables");
+ if (cfm.isView())
+ throw new InvalidRequestException("Materialized views cannot be created against other materialized views");
+
+ if (cfm.params.gcGraceSeconds == 0)
+ {
+ throw new InvalidRequestException(String.format("Cannot create materialized view '%s' for base table " +
+ "'%s' with gc_grace_seconds of 0, since this value is " +
+ "used to TTL undelivered updates. Setting gc_grace_seconds" +
+ " too low might cause undelivered updates to expire " +
+ "before being replayed.", cfName.getColumnFamily(),
+ baseName.getColumnFamily()));
+ }
+
+ Set<ColumnIdentifier> included = new HashSet<>();
+ for (RawSelector selector : selectClause)
+ {
+ Selectable.Raw selectable = selector.selectable;
+ if (selectable instanceof Selectable.WithFieldSelection.Raw)
+ throw new InvalidRequestException("Cannot select out a part of type when defining a materialized view");
+ if (selectable instanceof Selectable.WithFunction.Raw)
+ throw new InvalidRequestException("Cannot use function when defining a materialized view");
+ if (selectable instanceof Selectable.WritetimeOrTTL.Raw)
+ throw new InvalidRequestException("Cannot use function when defining a materialized view");
+ ColumnIdentifier identifier = (ColumnIdentifier) selectable.prepare(cfm);
+ if (selector.alias != null)
+ throw new InvalidRequestException(String.format("Cannot alias column '%s' as '%s' when defining a materialized view", identifier.toString(), selector.alias.toString()));
+
+ ColumnDefinition cdef = cfm.getColumnDefinition(identifier);
+
+ if (cdef == null)
+ throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier);
+
+ included.add(identifier);
+ }
+
+ Set<ColumnIdentifier.Raw> targetPrimaryKeys = new HashSet<>();
+ for (ColumnIdentifier.Raw identifier : Iterables.concat(partitionKeys, clusteringKeys))
+ {
+ if (!targetPrimaryKeys.add(identifier))
+ throw new InvalidRequestException("Duplicate entry found in PRIMARY KEY: "+identifier);
+
+ ColumnDefinition cdef = cfm.getColumnDefinition(identifier.prepare(cfm));
+
+ if (cdef == null)
+ throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier);
+
+ if (cfm.getColumnDefinition(identifier.prepare(cfm)).type.isMultiCell())
+ throw new InvalidRequestException(String.format("Cannot use MultiCell column '%s' in PRIMARY KEY of materialized view", identifier));
+
+ if (cdef.isStatic())
+ throw new InvalidRequestException(String.format("Cannot use Static column '%s' in PRIMARY KEY of materialized view", identifier));
+ }
+
+ // build the select statement
+ Map<ColumnIdentifier.Raw, Boolean> orderings = Collections.emptyMap();
+ SelectStatement.Parameters parameters = new SelectStatement.Parameters(orderings, false, true, false);
+ SelectStatement.RawStatement rawSelect = new SelectStatement.RawStatement(baseName, parameters, selectClause, whereClause, null, null);
+
+ ClientState state = ClientState.forInternalCalls();
+ state.setKeyspace(keyspace());
+
+ rawSelect.prepareKeyspace(state);
+ rawSelect.setBoundVariables(getBoundVariables());
+
+ ParsedStatement.Prepared prepared = rawSelect.prepare(true);
+ SelectStatement select = (SelectStatement) prepared.statement;
+ StatementRestrictions restrictions = select.getRestrictions();
+
+ if (!prepared.boundNames.isEmpty())
+ throw new InvalidRequestException("Cannot use query parameters in CREATE MATERIALIZED VIEW statements");
+
+ if (!restrictions.nonPKRestrictedColumns(false).isEmpty())
+ {
+ throw new InvalidRequestException(String.format(
+ "Non-primary key columns cannot be restricted in the SELECT statement used for materialized view " +
+ "creation (got restrictions on: %s)",
+ restrictions.nonPKRestrictedColumns(false).stream().map(def -> def.name.toString()).collect(Collectors.joining(", "))));
+ }
+
+ String whereClauseText = View.relationsToWhereClause(whereClause.relations);
+
+ Set<ColumnIdentifier> basePrimaryKeyCols = new HashSet<>();
+ for (ColumnDefinition definition : Iterables.concat(cfm.partitionKeyColumns(), cfm.clusteringColumns()))
+ basePrimaryKeyCols.add(
definition.name);
+
+ List<ColumnIdentifier> targetClusteringColumns = new ArrayList<>();
+ List<ColumnIdentifier> targetPartitionKeys = new ArrayList<>();
+
+ // This is only used as an intermediate state; this is to catch whether multiple non-PK columns are used
+ boolean hasNonPKColumn = false;
+ for (ColumnIdentifier.Raw raw : partitionKeys)
+ hasNonPKColumn |= getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetPartitionKeys, restrictions);
+
+ for (ColumnIdentifier.Raw raw : clusteringKeys)
+ hasNonPKColumn |= getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetClusteringColumns, restrictions);
+
+ // We need to include all of the primary key columns from the base table in order to make sure that we do not
+ // overwrite values in the view. We cannot support "collapsing" the base table into a smaller number of rows in
+ // the view because if we need to generate a tombstone, we have no way of knowing which value is currently being
+ // used in the view and whether or not to generate a tombstone. In order to not surprise our users, we require
+ // that they include all of the columns. We provide them with a list of all of the columns left to include.
+ boolean missingClusteringColumns = false;
+ StringBuilder columnNames = new StringBuilder();
+ List<ColumnIdentifier> includedColumns = new ArrayList<>();
+ for (ColumnDefinition def : cfm.allColumns())
+ {
+ ColumnIdentifier identifier =
def.name;
+ boolean includeDef = included.isEmpty() || included.contains(identifier);
+
+ if (includeDef && def.isStatic())
+ {
+ throw new InvalidRequestException(String.format("Unable to include static column '%s' which would be included by Materialized View SELECT * statement", identifier));
+ }
+
+ if (includeDef && !targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier))
+ {
+ includedColumns.add(identifier);
+ }
+ if (!def.isPrimaryKeyColumn()) continue;
+
+ if (!targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier))
+ {
+ if (missingClusteringColumns)
+ columnNames.append(',');
+ else
+ missingClusteringColumns = true;
+ columnNames.append(identifier);
+ }
+ }
+ if (missingClusteringColumns)
+ throw new InvalidRequestException(String.format("Cannot create Materialized View %s without primary key columns from base %s (%s)",
+ columnFamily(), baseName.getColumnFamily(), columnNames.toString()));
+
+ if (targetPartitionKeys.isEmpty())
+ throw new InvalidRequestException("Must select at least a column for a Materialized View");
+
+ if (targetClusteringColumns.isEmpty())
+ throw new InvalidRequestException("No columns are defined for Materialized View other than primary key");
+
+ CFMetaData.Builder cfmBuilder = CFMetaData.Builder.createView(keyspace(), columnFamily());
+ add(cfm, targetPartitionKeys, cfmBuilder::addPartitionKey);
+ add(cfm, targetClusteringColumns, cfmBuilder::addClusteringColumn);
+ add(cfm, includedColumns, cfmBuilder::addRegularColumn);
+ cfmBuilder.withId(properties.properties.getId());
+ TableParams params = properties.properties.asNewTableParams();
+ CFMetaData viewCfm = cfmBuilder.build().params(params);
+ ViewDefinition definition = new ViewDefinition(keyspace(),
+ columnFamily(),
+ Schema.instance.getId(keyspace(), baseName.getColumnFamily()),
+ baseName.getColumnFamily(),
+ included.isEmpty(),
+ rawSelect,
+ whereClauseText,
+ viewCfm);
+
+ try
+ {
+ MigrationManager.announceNewView(definition, isLocalOnly);
+ return new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
+ }
+ catch (AlreadyExistsException e)
+ {
+ if (ifNotExists)
+ return null;
+ throw e;
+ }
+#endif
+}
+
+shared_ptr<transport::event::schema_change> create_view_statement::change_event() {
+ // FIXME: this is probably wrong, I just copied it from create_table_statement
+ return make_shared<transport::event::schema_change>(transport::event::schema_change::change_type::CREATED, transport::event::schema_change::target_type::TABLE, keyspace(), column_family());
+}
+
+shared_ptr<cql3::statements::prepared_statement>
+create_view_statement::prepare(database& db) {
+ // FIXME: this is probably wrong, I just copied it
+ return make_shared<prepared_statement>(make_shared<create_view_statement>(*this));
+}
+
+
+#if 0
+/**
+ * Returns a CFMetaData instance based on the parameters parsed from this
+ * <code>CREATE</code> statement, or defaults where applicable.
+ *
+ * @return a CFMetaData instance corresponding to the values parsed from this statement
+ * @throws InvalidRequestException on failure to validate parsed parameters
+ */
+schema_ptr create_table_statement::get_cf_meta_data() {
+ schema_builder builder{keyspace(), column_family()};
+ apply_properties_to(builder);
+ return builder.build(_use_compact_storage ? schema_builder::compact_storage::yes : schema_builder::compact_storage::no);
+}
+
+void create_table_statement::apply_properties_to(schema_builder& builder) {
+ auto&& columns = get_columns();
+ for (auto&& column : columns) {
+ builder.with_column(column);
+ }
+#if 0
+ cfmd.defaultValidator(defaultValidator)
+ .addAllColumnDefinitions(getColumns(cfmd))
+#endif
+ add_column_metadata_from_aliases(builder, _key_aliases, _partition_key_types, column_kind::partition_key);
+ add_column_metadata_from_aliases(builder, _column_aliases, _clustering_key_types, column_kind::clustering_key);
+#if 0
+ if (valueAlias != null)
+ addColumnMetadataFromAliases(cfmd, Collections.singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
+#endif
+
+ _properties->apply_to_builder(builder);
+}
+
+void create_table_statement::add_column_metadata_from_aliases(schema_builder& builder, std::vector<bytes> aliases, const std::vector<data_type>& types, column_kind kind)
+{
+ assert(aliases.size() == types.size());
+ for (size_t i = 0; i < aliases.size(); i++) {
+ if (!aliases[i].empty()) {
+ builder.with_column(aliases[i], types[i], kind);
+ }
+ }
+}
+
+create_table_statement::raw_statement::raw_statement(::shared_ptr<cf_name> name, bool if_not_exists)
+ : cf_statement{std::move(name)}
+ , _if_not_exists{if_not_exists}
+{ }
+
+::shared_ptr<parsed_statement::prepared> create_table_statement::raw_statement::prepare(database& db) {
+ // Column family name
+ const sstring& cf_name = _cf_name->get_column_family();
+ std::regex name_regex("\\w+");
+ if (!std::regex_match(std::string(cf_name), name_regex)) {
+ throw exceptions::invalid_request_exception(sprint("\"%s\" is not a valid table name (must be alphanumeric character only: [0-9A-Za-z]+)", cf_name.c_str()));
+ }
+ if (cf_name.size() > size_t(schema::NAME_LENGTH)) {
+ throw exceptions::invalid_request_exception(sprint("Table names shouldn't be more than %d characters long (got \"%s\")", schema::NAME_LENGTH, cf_name.c_str()));
+ }
+
+ // Check for duplicate column names
+ auto i = boost::range::adjacent_find(_defined_names, [] (auto&& e1, auto&& e2) {
+ return e1->text() == e2->text();
+ });
+ if (i != _defined_names.end()) {
+ throw exceptions::invalid_request_exception(sprint("Multiple definition of identifier %s", (*i)->text()));
+ }
+
+ properties->validate();
+
+ auto stmt = ::make_shared<create_table_statement>(_cf_name, properties, _if_not_exists, _static_columns);
+
+ std::experimental::optional<std::map<bytes, data_type>> defined_multi_cell_collections;
+ for (auto&& entry : _definitions) {
+ ::shared_ptr<column_identifier> id = entry.first;
+ ::shared_ptr<cql3_type> pt = entry.second->prepare(db, keyspace());
+ // FIXME: remove this check once we support counters
+ if (pt->is_counter()) {
+ fail(unimplemented::cause::COUNTERS);
+ }
+ if (pt->is_collection() && pt->get_type()->is_multi_cell()) {
+ if (!defined_multi_cell_collections) {
+ defined_multi_cell_collections = std::map<bytes, data_type>{};
+ }
+ defined_multi_cell_collections->emplace(id->name(), pt->get_type());
+ }
+ stmt->_columns.emplace(id, pt->get_type()); // we'll remove what is not a column below
+ }
+ if (_key_aliases.empty()) {
+ throw exceptions::invalid_request_exception("No PRIMARY KEY specifed (exactly one required)");
+ } else if (_key_aliases.size() > 1) {
+ throw exceptions::invalid_request_exception("Multiple PRIMARY KEYs specifed (exactly one required)");
+ }
+
+ stmt->_use_compact_storage = _use_compact_storage;
+
+ auto& key_aliases = _key_aliases[0];
+ std::vector<data_type> key_types;
+ for (auto&& alias : key_aliases) {
+ stmt->_key_aliases.emplace_back(alias->name());
+ auto t = get_type_and_remove(stmt->_columns, alias);
+ if (t->is_counter()) {
+ throw exceptions::invalid_request_exception(sprint("counter type is not supported for PRIMARY KEY part %s", alias->text()));
+ }
+ if (_static_columns.count(alias) > 0) {
+ throw exceptions::invalid_request_exception(sprint("Static column %s cannot be part of the PRIMARY KEY", alias->text()));
+ }
+ key_types.emplace_back(t);
+ }
+ stmt->_partition_key_types = key_types;
+
+ // Handle column aliases
+ if (_column_aliases.empty()) {
+ if (_use_compact_storage) {
+ // There should remain some column definition since it is a non-composite "static" CF
+ if (stmt->_columns.empty()) {
+ throw exceptions::invalid_request_exception("No definition found that is not part of the PRIMARY KEY");
+ }
+ if (defined_multi_cell_collections) {
+ throw exceptions::invalid_request_exception("Non-frozen collection types are not supported with COMPACT STORAGE");
+ }
+ }
+ stmt->_clustering_key_types = std::vector<data_type>{};
+ } else {
+ // If we use compact storage and have only one alias, it is a
+ // standard "dynamic" CF, otherwise it's a composite
+ if (_use_compact_storage && _column_aliases.size() == 1) {
+ if (defined_multi_cell_collections) {
+ throw exceptions::invalid_request_exception("Collection types are not supported with COMPACT STORAGE");
+ }
+ auto alias = _column_aliases[0];
+ if (_static_columns.count(alias) > 0) {
+ throw exceptions::invalid_request_exception(sprint("Static column %s cannot be part of the PRIMARY KEY", alias->text()));
+ }
+ stmt->_column_aliases.emplace_back(alias->name());
+ auto at = get_type_and_remove(stmt->_columns, alias);
+ if (at->is_counter()) {
+ throw exceptions::invalid_request_exception(sprint("counter type is not supported for PRIMARY KEY part %s", stmt->_column_aliases[0]));
+ }
+ stmt->_clustering_key_types.emplace_back(at);
+ } else {
+ std::vector<data_type> types;
+ for (auto&& t : _column_aliases) {
+ stmt->_column_aliases.emplace_back(t->name());
+ auto type = get_type_and_remove(stmt->_columns, t);
+ if (type->is_counter()) {
+ throw exceptions::invalid_request_exception(sprint("counter type is not supported for PRIMARY KEY part %s", t->text()));
+ }
+ if (_static_columns.count(t) > 0) {
+ throw exceptions::invalid_request_exception(sprint("Static column %s cannot be part of the PRIMARY KEY", t->text()));
+ }
+ types.emplace_back(type);
+ }
+
+ if (_use_compact_storage) {
+ if (defined_multi_cell_collections) {
+ throw exceptions::invalid_request_exception("Collection types are not supported with COMPACT STORAGE");
+ }
+ stmt->_clustering_key_types = types;
+ } else {
+ stmt->_clustering_key_types = types;
+ }
+ }
+ }
+
+ if (!_static_columns.empty()) {
+ // Only CQL3 tables can have static columns
+ if (_use_compact_storage) {
+ throw exceptions::invalid_request_exception("Static columns are not supported in COMPACT STORAGE tables");
+ }
+ // Static columns only make sense if we have at least one clustering column. Otherwise everything is static anyway
+ if (_column_aliases.empty()) {
+ throw exceptions::invalid_request_exception("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
+ }
+ }
+
+ if (_use_compact_storage && !stmt->_column_aliases.empty()) {
+ if (stmt->_columns.empty()) {
+#if 0
+ // The only value we'll insert will be the empty one, so the default validator don't matter
+ stmt.defaultValidator = BytesType.instance;
+ // We need to distinguish between
+ // * I'm upgrading from thrift so the valueAlias is null
+ // * I've defined my table with only a PK (and the column value will be empty)
+ // So, we use an empty valueAlias (rather than null) for the second case
+ stmt.valueAlias = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+#endif
+ } else {
+ if (stmt->_columns.size() > 1) {
+ throw exceptions::invalid_request_exception(sprint("COMPACT STORAGE with composite PRIMARY KEY allows no more than one column not part of the PRIMARY KEY (got: %s)",
+ ::join( ", ", stmt->_columns | boost::adaptors::map_keys)));
+ }
+#if 0
+ Map.Entry<ColumnIdentifier, AbstractType> lastEntry = stmt.columns.entrySet().iterator().next();
+ stmt.defaultValidator = lastEntry.getValue();
+ stmt.valueAlias = lastEntry.getKey().bytes;
+ stmt.columns.remove(lastEntry.getKey());
+#endif
+ }
+ } else {
+ // For compact, we are in the "static" case, so we need at least one column defined. For non-compact however, having
+ // just the PK is fine since we have CQL3 row marker.
+ if (_use_compact_storage && stmt->_columns.empty()) {
+ throw exceptions::invalid_request_exception("COMPACT STORAGE with non-composite PRIMARY KEY require one column not part of the PRIMARY KEY, none given");
+ }
+#if 0
+ // There is no way to insert/access a column that is not defined for non-compact storage, so
+ // the actual validator don't matter much (except that we want to recognize counter CF as limitation apply to them).
+ stmt.defaultValidator = !stmt.columns.isEmpty() && (stmt.columns.values().iterator().next() instanceof CounterColumnType)
+ ? CounterColumnType.instance
+ : BytesType.instance;
+#endif
+ }
+
+ // If we give a clustering order, we must explicitly do so for all aliases and in the order of the PK
+ if (!_defined_ordering.empty()) {
+ if (_defined_ordering.size() > _column_aliases.size()) {
+ throw exceptions::invalid_request_exception("Only clustering key columns can be defined in CLUSTERING ORDER directive");
+ }
+
+ int i = 0;
+ for (auto& pair: _defined_ordering){
+ auto& id = pair.first;
+ auto& c = _
column_aliases.at(i);
+
+ if (!(*id == *c)) {
+ if (find_ordering_info(c)) {
+ throw exceptions::invalid_request_exception(sprint("The order of columns in the CLUSTERING ORDER directive must be the one of the clustering key (%s must appear before %s)", c, id));
+ } else {
+ throw exceptions::invalid_request_exception(sprint("Missing CLUSTERING ORDER for column %s", c));
+ }
+ }
+ ++i;
+ }
+ }
+
+ return ::make_shared<parsed_statement::prepared>(stmt);
+}
+
+data_type create_table_statement::raw_statement::get_type_and_remove(column_map_type& columns, ::shared_ptr<column_identifier> t)
+{
+ auto it = columns.find(t);
+ if (it == columns.end()) {
+ throw exceptions::invalid_request_exception(sprint("Unknown definition %s referenced in PRIMARY KEY", t->text()));
+ }
+ auto type = it->second;
+ if (type->is_collection() && type->is_multi_cell()) {
+ throw exceptions::invalid_request_exception(sprint("Invalid collection type for PRIMARY KEY component %s", t->text()));
+ }
+ columns.erase(t);
+
+ auto is_reversed = find_ordering_info(t);
+ if (!is_reversed) {
+ return type;
+ } else {
+ return *is_reversed ? reversed_type_impl::get_instance(type) : type;
+ }
+}
+
+void create_table_statement::raw_statement::add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static) {
+ _defined_names.emplace(def);
+ _definitions.emplace(def, type);
+ if (is_static) {
+ _static_columns.emplace(def);
+ }
+}
+
+void create_table_statement::raw_statement::add_key_aliases(const std::vector<::shared_ptr<column_identifier>> aliases) {
+ _key_aliases.emplace_back(aliases);
+}
+
+void create_table_statement::raw_statement::add_column_alias(::shared_ptr<column_identifier> alias) {
+ _column_aliases.emplace_back(alias);
+}
+
+void create_table_statement::raw_statement::set_ordering(::shared_ptr<column_identifier> alias, bool reversed) {
+ _defined_ordering.emplace_back(alias, reversed);
+}
+
+void create_table_statement::raw_statement::set_compact_storage() {
+ _use_compact_storage = true;
+}
+#endif
+
+}
+
+}
diff --git a/cql3/Cql.g b/cql3/Cql.g
index f5b2c0f..b4ea1b5 100644
--- a/cql3/Cql.g
+++ b/cql3/Cql.g
@@ -40,6 +40,7 @@ options {
#include "cql3/statements/drop_keyspace_statement.hh"
#include "cql3/statements/create_index_statement.hh"
#include "cql3/statements/create_table_statement.hh"
+#include "cql3/statements/create_view_statement.hh"
#include "cql3/statements/create_type_statement.hh"
#include "cql3/statements/drop_type_statement.hh"
#include "cql3/statements/alter_type_statement.hh"
@@ -340,6 +341,7 @@ cqlStatement returns [shared_ptr<raw::parsed_statement> stmt]
| st30=createAggregateStatement { $stmt = st30; }
| st31=dropAggregateStatement { $stmt = st31; }
#endif
+ | st32=createViewStatement { $stmt = st32; }
;
/*
@@ -787,6 +789,41 @@ indexIdent returns [::shared_ptr<index_target::raw> id]
| K_FULL '(' c=cident ')' { $id = index_target::raw::full_collection(c); }
;
+/**
+ * CREATE MATERIALIZED VIEW <viewName> AS
+ * SELECT <columns>
+ * FROM <CF>
+ * WHERE <pkColumns> IS NOT NULL
+ * PRIMARY KEY (<pkColumns>)
+ * WITH <property> = <value> AND ...;
+ */
+createViewStatement returns [::shared_ptr<create_view_statement> expr]
+ @init {
+ bool if_not_exists = false;
+ std::vector<::shared_ptr<cql3::column_identifier::raw>> partition_keys;
+ std::vector<::shared_ptr<cql3::column_identifier::raw>> composite_keys;
+ auto props = make_shared<cf_prop_defs>();
+ }
+ : K_CREATE K_MATERIALIZED K_VIEW (K_IF K_NOT K_EXISTS { if_not_exists = true; })? cf=columnFamilyName K_AS
+ K_SELECT sclause=selectClause K_FROM basecf=columnFamilyName
+ (K_WHERE wclause=whereClause)?
+ K_PRIMARY K_KEY (
+ '(' '(' k1=cident { partition_keys.push_back(k1); } ( ',' kn=cident { partition_keys.push_back(kn); } )* ')' ( ',' c1=cident { composite_keys.push_back(c1); } )* ')'
+ | '(' k1=cident { partition_keys.push_back(k1); } ( ',' cn=cident { composite_keys.push_back(cn); } )* ')'
+ )
+ (K_WITH properties[props])?
+ {
+ $expr = ::make_shared<create_view_statement>(
+ std::move(cf),
+ std::move(basecf),
+ std::move(sclause),
+ std::move(wclause),
+ std::move(partition_keys),
+ std::move(composite_keys),
+ std::move(props),
+ if_not_exists);
+ }
+ ;
#if 0
/**
@@ -1304,7 +1341,8 @@ relation[std::vector<cql3::relation_ptr>& clauses]
| K_TOKEN l=tupleOfIdentifiers type=relationType t=term
{ $clauses.emplace_back(::make_shared<cql3::token_relation>(std::move(l), *type, std::move(t))); }
-
+ | name=cident K_IS K_NOT K_NULL {
+ $clauses.emplace_back(make_shared<cql3::single_column_relation>(std::move(name), cql3::operator_type::IS_NOT, cql3::constants::NULL_LITERAL)); }
| name=cident K_IN marker=inMarker
{ $clauses.emplace_back(make_shared<cql3::single_column_relation>(std::move(name), cql3::operator_type::IN, std::move(marker))); }
| name=cident K_IN in_values=singleColumnInValues
@@ -1528,6 +1566,8 @@ K_KEYSPACE: ( K E Y S P A C E
K_KEYSPACES: K E Y S P A C E S;
K_COLUMNFAMILY:( C O L U M N F A M I L Y
| T A B L E );
+K_MATERIALIZED:M A T E R I A L I Z E D;
+K_VIEW: V I E W;
K_INDEX: I N D E X;
K_CUSTOM: C U S T O M;
K_ON: O N;
@@ -1551,6 +1591,7 @@ K_DESC: D E S C;
K_ALLOW: A L L O W;
K_FILTERING: F I L T E R I N G;
K_IF: I F;
+K_IS: I S;
K_CONTAINS: C O N T A I N S;
K_GRANT: G R A N T;
--
2.7.4