[COMMIT scylladb master] Merge 'Remove dead code from migration_manager and schema_tables' from Benny Halevy

0 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Jun 28, 2024, 10:08:45 AM (2 days ago) Jun 28
to scylladb-dev@googlegroups.com, Kamil Braun
From: Kamil Braun <kbr...@scylladb.com>
Committer: Kamil Braun <kbr...@scylladb.com>
Branch: master

Merge 'Remove dead code from migration_manager and schema_tables' from Benny Halevy

This short series removed some ancient legacy code from
migration_manager and schema_tables, before I make further changes in this area.

We have more such code under the cql3 hierarchy but it can be dealt with as a follow up.

No backport required

Closes scylladb/scylladb#19530

* github.com:scylladb/scylladb:
schema_tables: remove dead code
migration_manager: remove dead code

---
diff --git a/db/schema_tables.cc b/db/schema_tables.cc
--- a/db/schema_tables.cc
+++ b/db/schema_tables.cc
@@ -755,20 +755,6 @@ schema_ptr scylla_table_schema_history() {

}

-#if 0
- public static void truncateSchemaTables()
- {
- for (String table : ALL)
- getSchemaCFS(table).truncateBlocking();
- }
-
- private static void flushSchemaTables()
- {
- for (String table : ALL)
- SystemKeyspace.forceBlockingFlush(table);
- }
-#endif
-
static
mutation
redact_columns_for_missing_features(mutation&& m, schema_features features) {
@@ -1881,61 +1867,6 @@ static std::vector<data_value> read_arg_values(const query::result_set_row& row)
return std::vector<data_value>(args.begin(), args.end());
}

-#if 0
- // see the comments for mergeKeyspaces()
- private static void mergeAggregates(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
- {
- List<UDAggregate> created = new ArrayList<>();
- List<UDAggregate> altered = new ArrayList<>();
- List<UDAggregate> dropped = new ArrayList<>();
-
- MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
-
- // New keyspace with functions
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
- if (entry.getValue().hasColumns())
- created.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), entry.getValue())).values());
-
- for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
- {
- ColumnFamily pre = entry.getValue().leftValue();
- ColumnFamily post = entry.getValue().rightValue();
-
- if (pre.hasColumns() && post.hasColumns())
- {
- MapDifference<ByteBuffer, UDAggregate> delta =
- Maps.difference(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), pre)),
- createAggregatesFromAggregatesPartition(new Row(entry.getKey(), post)));
-
- dropped.addAll(delta.entriesOnlyOnLeft().values());
- created.addAll(delta.entriesOnlyOnRight().values());
- Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDAggregate>, UDAggregate>()
- {
- public UDAggregate apply(MapDifference.ValueDifference<UDAggregate> pair)
- {
- return pair.rightValue();
- }
- }));
- }
- else if (pre.hasColumns())
- {
- dropped.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), pre)).values());
- }
- else if (post.hasColumns())
- {
- created.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), post)).values());
- }
- }
-
- for (UDAggregate udf : created)
- Schema.instance.addAggregate(udf);
- for (UDAggregate udf : altered)
- Schema.instance.updateAggregate(udf);
- for (UDAggregate udf : dropped)
- Schema.instance.dropAggregate(udf);
- }
-#endif
-
static seastar::future<shared_ptr<cql3::functions::user_function>> create_func(replica::database& db, const query::result_set_row& row) {
cql3::functions::function_name name{
row.get_nonnull<sstring>("keyspace_name"), row.get_nonnull<sstring>("function_name")};
@@ -2863,18 +2794,6 @@ std::vector<mutation> make_update_table_mutations(replica::database& db,
make_update_columns_mutations(std::move(old_table), std::move(new_table), timestamp, mutations);

warn(unimplemented::cause::TRIGGERS);
-#if 0
- MapDifference<String, TriggerDefinition> triggerDiff = Maps.difference(oldTable.getTriggers(), newTable.getTriggers());
-
- // dropped triggers
- for (TriggerDefinition trigger : triggerDiff.entriesOnlyOnLeft().values())
- dropTriggerFromSchemaMutation(oldTable, trigger, timestamp, mutation);
-
- // newly created triggers
- for (TriggerDefinition trigger : triggerDiff.entriesOnlyOnRight().values())
- addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation);
-
-#endif
return mutations;
}

@@ -2910,15 +2829,6 @@ std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata>
std::vector<mutation> mutations;
make_drop_table_or_view_mutations(tables(), std::move(table), timestamp, mutations);

-#if 0
- for (TriggerDefinition trigger : table.getTriggers().values())
- dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation);
-
- // TODO: get rid of in #6717
- ColumnFamily indexCells = mutation.addOrGet(SystemKeyspace.BuiltIndexes);
- for (String indexName : Keyspace.open(keyspace.name).getColumnFamilyStore(table.cfName).getBuiltIndexes())
- indexCells.addTombstone(indexCells.getComparator().makeCellName(indexName), ldt, timestamp);
-#endif
return mutations;
}

@@ -2934,19 +2844,6 @@ static future<schema_mutations> read_table_mutations(distributed<service::storag
[&] { return read_schema_partition_for_table(proxy, scylla_tables(), table.keyspace_name, table.table_name); }
);
co_return schema_mutations{std::move(cf_m), std::move(col_m), std::move(vv_col_m), std::move(c_col_m), std::move(idx_m), std::move(dropped_m), std::move(st_m)};
-#if 0
- // FIXME:
- Row serializedTriggers = readSchemaPartitionForTable(TRIGGERS, ksName, cfName);
- try
- {
- for (TriggerDefinition trigger : createTriggersFromTriggersPartition(serializedTriggers))
- cfm.addTriggerDefinition(trigger);
- }
- catch (InvalidRequestException e)
- {
- throw new RuntimeException(e);
- }
-#endif
}

future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& proxy, const sstring& keyspace, const sstring& table)
@@ -2974,14 +2871,6 @@ future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(distri
co_return std::move(tables);
}

-#if 0
- public static CFMetaData createTableFromTablePartitionAndColumnsPartition(Row serializedTable, Row serializedColumns)
- {
- String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
- return createTableFromTableRowAndColumnsPartition(QueryProcessor.resultify(query, serializedTable).one(), serializedColumns);
- }
-#endif
-
/**
* Deserialize table metadata from low-level representation
*
@@ -3555,150 +3444,6 @@ std::vector<mutation> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata>
return mutations;
}

-#if 0
- private static AbstractType<?> getComponentComparator(AbstractType<?> rawComparator, Integer componentIndex)
- {
- return (componentIndex == null || (componentIndex == 0 && !(rawComparator instanceof CompositeType)))
- ? rawComparator
- : ((CompositeType)rawComparator).types.get(componentIndex);
- }
-
- /*
- * Trigger metadata serialization/deserialization.
- */
-
- private static void addTriggerToSchemaMutation(CFMetaData table, TriggerDefinition trigger, long timestamp, Mutation mutation)
- {
- ColumnFamily cells = mutation.addOrGet(Triggers);
- Composite prefix = Triggers.comparator.make(table.cfName, trigger.name);
- CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
- adder.addMapEntry("trigger_options", "class", trigger.classOption);
- }
-
- private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerDefinition trigger, long timestamp, Mutation mutation)
- {
- ColumnFamily cells = mutation.addOrGet(Triggers);
- int ldt = (int) (System.currentTimeMillis() / 1000);
-
- Composite prefix = Triggers.comparator.make(table.cfName, trigger.name);
- cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
- }
-
- /**
- * Deserialize triggers from storage-level representation.
- *
- * @param partition storage-level partition containing the trigger definitions
- * @return the list of processed TriggerDefinitions
- */
- private static List<TriggerDefinition> createTriggersFromTriggersPartition(Row partition)
- {
- List<TriggerDefinition> triggers = new ArrayList<>();
- String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, TRIGGERS);
- for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
- {
- String name = row.getString("trigger_name");
- String classOption = row.getMap("trigger_options", UTF8Type.instance, UTF8Type.instance).get("class");
- triggers.add(new TriggerDefinition(name, classOption));
- }
- return triggers;
- }
-
- /*
- * Aggregate UDF metadata serialization/deserialization.
- */
-
- public static Mutation makeCreateAggregateMutation(KSMetaData keyspace, UDAggregate aggregate, long timestamp)
- {
- // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
- Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
- addAggregateToSchemaMutation(aggregate, timestamp, mutation);
- return mutation;
- }
-
- private static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation)
- {
- ColumnFamily cells = mutation.addOrGet(Aggregates);
- Composite prefix = Aggregates.comparator.make(aggregate.name().name, UDHelper.calculateSignature(aggregate));
- CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
-
- adder.resetCollection("argument_types");
- adder.add("return_type", aggregate.returnType().toString());
- adder.add("state_func", aggregate.stateFunction().name().name);
- if (aggregate.stateType() != null)
- adder.add("state_type", aggregate.stateType().toString());
- if (aggregate.finalFunction() != null)
- adder.add("final_func", aggregate.finalFunction().name().name);
- if (aggregate.initialCondition() != null)
- adder.add("initcond", aggregate.initialCondition());
-
- for (AbstractType<?> argType : aggregate.argTypes())
- adder.addListEntry("argument_types", argType.toString());
- }
-
- private static Map<ByteBuffer, UDAggregate> createAggregatesFromAggregatesPartition(Row partition)
- {
- Map<ByteBuffer, UDAggregate> aggregates = new HashMap<>();
- String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, AGGREGATES);
- for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
- {
- UDAggregate aggregate = createAggregateFromAggregateRow(row);
- aggregates.put(UDHelper.calculateSignature(aggregate), aggregate);
- }
- return aggregates;
- }
-
- private static UDAggregate createAggregateFromAggregateRow(UntypedResultSet.Row row)
- {
- String ksName = row.getString("keyspace_name");
- String functionName = row.getString("aggregate_name");
- FunctionName name = new FunctionName(ksName, functionName);
-
- List<String> types = row.getList("argument_types", UTF8Type.instance);
-
- List<AbstractType<?>> argTypes;
- if (types == null)
- {
- argTypes = Collections.emptyList();
- }
- else
- {
- argTypes = new ArrayList<>(types.size());
- for (String type : types)
- argTypes.add(parseType(type));
- }
-
- AbstractType<?> returnType = parseType(row.getString("return_type"));
-
- FunctionName stateFunc = new FunctionName(ksName, row.getString("state_func"));
- FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null;
- AbstractType<?> stateType = row.has("state_type") ? parseType(row.getString("state_type")) : null;
- ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null;
-
- try
- {
- return UDAggregate.create(name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond);
- }
- catch (InvalidRequestException reason)
- {
- return UDAggregate.createBroken(name, argTypes, returnType, initcond, reason);
- }
- }
-
- public static Mutation makeDropAggregateMutation(KSMetaData keyspace, UDAggregate aggregate, long timestamp)
- {
- // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
- Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
-
- ColumnFamily cells = mutation.addOrGet(Aggregates);
- int ldt = (int) (System.currentTimeMillis() / 1000);
-
- Composite prefix = Aggregates.comparator.make(aggregate.name().name, UDHelper.calculateSignature(aggregate));
- cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
- return mutation;
- }
-#endif
-
data_type parse_type(sstring str)
{
return db::marshal::type_parser::parse(str);
diff --git a/service/migration_manager.cc b/service/migration_manager.cc
--- a/service/migration_manager.cc
+++ b/service/migration_manager.cc
@@ -473,20 +473,6 @@ future<> migration_notifier::create_view(view_ptr view) {
});
}

-#if 0
-public void notifyCreateFunction(UDFunction udf)
-{
- for (IMigrationListener listener : listeners)
- listener.onCreateFunction(udf.name().keyspace, udf.name().name);
-}
-
-public void notifyCreateAggregate(UDAggregate udf)
-{
- for (IMigrationListener listener : listeners)
- listener.onCreateAggregate(udf.name().keyspace, udf.name().name);
-}
-#endif
-
future<> migration_notifier::update_keyspace(lw_shared_ptr<keyspace_metadata> ksm) {
const auto& name = ksm->name();
co_await on_schema_change([&] (migration_listener* listener) {
@@ -534,20 +520,6 @@ future<> migration_notifier::update_tablet_metadata() {
});
}

-#if 0
-public void notifyUpdateFunction(UDFunction udf)
-{
- for (IMigrationListener listener : listeners)
- listener.onUpdateFunction(udf.name().keyspace, udf.name().name);
-}
-
-public void notifyUpdateAggregate(UDAggregate udf)
-{
- for (IMigrationListener listener : listeners)
- listener.onUpdateAggregate(udf.name().keyspace, udf.name().name);
-}
-#endif
-
future<> migration_notifier::drop_keyspace(sstring ks_name) {
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_drop_keyspace(ks_name);
@@ -637,20 +609,6 @@ void migration_notifier::before_drop_keyspace(const sstring& keyspace_name,
});
}

-#if 0
-public void notifyDropFunction(UDFunction udf)
-{
- for (IMigrationListener listener : listeners)
- listener.onDropFunction(udf.name().keyspace, udf.name().name);
-}
-
-public void notifyDropAggregate(UDAggregate udf)
-{
- for (IMigrationListener listener : listeners)
- listener.onDropAggregate(udf.name().keyspace, udf.name().name);
-}
-#endif
-
std::vector<mutation> prepare_keyspace_update_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type ts) {
db.validate_keyspace_update(*ksm);
mlogger.info("Update Keyspace: {}", ksm);
@@ -693,9 +651,6 @@ static future<std::vector<mutation>> do_prepare_new_column_family_announcement(s
}

future<std::vector<mutation>> prepare_new_column_family_announcement(storage_proxy& sp, schema_ptr cfm, api::timestamp_type timestamp) {
-#if 0
- cfm.validate();
-#endif
try {
auto& db = sp.get_db().local();
auto ksm = db.find_keyspace(cfm->ks_name()).metadata();
@@ -717,15 +672,9 @@ future<> prepare_new_column_family_announcement(std::vector<mutation>& mutations
future<std::vector<mutation>> prepare_column_family_update_announcement(storage_proxy& sp,
schema_ptr cfm, std::vector<view_ptr> view_updates, api::timestamp_type ts) {
warn(unimplemented::cause::VALIDATION);
-#if 0
- cfm.validate();
-#endif
try {
auto& db = sp.local_db();
auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name()).schema(); // FIXME: Should we lookup by id?
-#if 0
- oldCfm.validateCompatility(cfm);
-#endif
mlogger.info("Update table '{}.{}' From {} To {}", cfm->ks_name(), cfm->cf_name(), *old_schema, *cfm);
auto&& keyspace = db.find_keyspace(cfm->ks_name()).metadata();

@@ -872,9 +821,6 @@ future<std::vector<mutation>> prepare_type_drop_announcement(storage_proxy& sp,
}

future<std::vector<mutation>> prepare_new_view_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts) {
-#if 0
- view.metadata.validate();
-#endif
auto& db = sp.local_db();
try {
auto keyspace = db.find_keyspace(view->ks_name()).metadata();
@@ -898,19 +844,13 @@ future<std::vector<mutation>> prepare_new_view_announcement(storage_proxy& sp, v
}

future<std::vector<mutation>> prepare_view_update_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts) {
-#if 0
- view.metadata.validate();
-#endif
auto db = sp.data_dictionary();
try {
auto&& keyspace = db.find_keyspace(view->ks_name()).metadata();
auto& old_view = keyspace->cf_meta_data().at(view->cf_name());
if (!old_view->is_view()) {
co_await coroutine::return_exception(exceptions::invalid_request_exception("Cannot use ALTER MATERIALIZED VIEW on Table"));
}
-#if 0
- oldCfm.validateCompatility(cfm);
-#endif
mlogger.info("Update view '{}.{}' From {} To {}", view->ks_name(), view->cf_name(), *old_view, *view);
auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), ts, true);
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
@@ -1075,75 +1015,6 @@ future<> migration_manager::passive_announce() {
return _gossiper.add_local_application_state(gms::application_state::SCHEMA, gms::versioned_value::schema(_schema_version_to_publish));
}

-#if 0
-/**
- * Clear all locally stored schema information and reset schema to initial state.
- * Called by user (via JMX) who wants to get rid of schema disagreement.
- *
- * @throws IOException if schema tables truncation fails
- */
-public static void resetLocalSchema() throws IOException
-{
- mlogger.info("Starting local schema reset...");
-
- mlogger.debug("Truncating schema tables...");
-
- LegacySchemaTables.truncateSchemaTables();
-
- mlogger.debug("Clearing local schema keyspace definitions...");
-
- Schema.instance.clear();
-
- Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
- liveEndpoints.remove(FBUtilities.getBroadcastAddress());
-
- // force migration if there are nodes around
- for (InetAddress node : liveEndpoints)
- {
- if (shouldPullSchemaFrom(node))
- {
- mlogger.debug("Requesting schema from {}", node);
- FBUtilities.waitOnFuture(submitMigrationTask(node));
- break;
- }
- }
-
- mlogger.info("Local schema reset is complete.");
-}
-
-public static class MigrationsSerializer implements IVersionedSerializer<Collection<Mutation>>
-{
- public static MigrationsSerializer instance = new MigrationsSerializer();
-
- public void serialize(Collection<Mutation> schema, DataOutputPlus out, int version) throws IOException
- {
- out.writeInt(schema.size());
- for (Mutation mutation : schema)
- Mutation.serializer.serialize(mutation, out, version);
- }
-
- public Collection<Mutation> deserialize(DataInput in, int version) throws IOException
- {
- int count = in.readInt();
- Collection<Mutation> schema = new ArrayList<>(count);
-
- for (int i = 0; i < count; i++)
- schema.add(Mutation.serializer.deserialize(in, version));
-
- return schema;
- }
-
- public long serializedSize(Collection<Mutation> schema, int version)
- {
- int size = TypeSizes.NATIVE.sizeof(schema.size());
- for (Mutation mutation : schema)
- size += Mutation.serializer.serializedSize(mutation, version);
- return size;
- }
-}
-#endif
-
-
// Ensure that given schema version 's' was synced with on current node. See schema::is_synced().
//
// The endpoint is the node from which 's' originated.
Reply all
Reply to author
Forward
0 new messages