123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- from django.db.backends.mysql import schema as myschema
- class DatabaseSchemaEditor(myschema.DatabaseSchemaEditor):
- sql_delete_table = "DROP TABLE %(table)s"
- sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
- sql_create_table = "CREATE TABLE %(table)s (%(definition)s) AUTO_INCREMENT=10000"
- def remove_field(self, model, field):
- # Drop any Index, TiDB requires explicite deletion
- if field.db_index:
- idx_names = self._constraint_names(model, [field.column], index=True)
- for idx_name in idx_names:
- self.execute(self._delete_constraint_sql(self.sql_delete_index, model, idx_name))
- super(DatabaseSchemaEditor, self).remove_field(model, field)
- def column_sql(self, model, field, include_default=False):
- """
- Take a field and return its column definition.
- The field must already have had set_attributes_from_name() called.
- """
- # Get the column's type and use that as the basis of the SQL
- db_params = field.db_parameters(connection=self.connection)
- sql = db_params['type']
- params = []
- # Check for fields that aren't actually columns (e.g. M2M)
- if sql is None:
- return None, None
- # Work out nullability
- null = field.null
- # If we were told to include a default value, do so
- include_default = include_default and not self.skip_default(field)
- if include_default:
- default_value = self.effective_default(field)
- if default_value is not None:
- if self.connection.features.requires_literal_defaults:
- # Some databases can't take defaults as a parameter (oracle)
- # If this is the case, the individual schema backend should
- # implement prepare_default
- sql += " DEFAULT %s" % self.prepare_default(default_value)
- else:
- sql += " DEFAULT %s"
- params += [default_value]
- # Oracle treats the empty string ('') as null, so coerce the null
- # option whenever '' is a possible value.
- if (field.empty_strings_allowed and not field.primary_key and
- self.connection.features.interprets_empty_strings_as_nulls):
- null = True
- if null and not self.connection.features.implied_column_null:
- sql += " NULL"
- elif not null:
- sql += " NOT NULL"
- # Primary key/unique outputs
- if field.primary_key:
- sql += " PRIMARY KEY"
- # elif field.unique:
- # sql += " UNIQUE"
- # Optionally add the tablespace if it's an implicitly indexed column
- tablespace = field.db_tablespace or model._meta.db_tablespace
- if tablespace and self.connection.features.supports_tablespaces and field.unique:
- sql += " %s" % self.connection.ops.tablespace_sql(tablespace, inline=True)
- # Return the sql
- return sql, params
- def create_model(self, model):
- """
- Create a table and any accompanying indexes or unique constraints for
- the given `model`.
- """
- # Create column SQL, add FK deferreds if needed
- column_sqls = []
- params = []
- for field in model._meta.local_fields:
- # SQL
- definition, extra_params = self.column_sql(model, field)
- if definition is None:
- continue
- # Check constraints can go on the column SQL here
- db_params = field.db_parameters(connection=self.connection)
- if db_params['check']:
- definition += " CHECK (%s)" % db_params['check']
- # Autoincrement SQL (for backends with inline variant)
- col_type_suffix = field.db_type_suffix(connection=self.connection)
- if col_type_suffix:
- definition += " %s" % col_type_suffix
- params.extend(extra_params)
- # FK
- if field.remote_field and field.db_constraint:
- to_table = field.remote_field.model._meta.db_table
- to_column = field.remote_field.model._meta.get_field(field.remote_field.field_name).column
- if self.sql_create_inline_fk:
- definition += " " + self.sql_create_inline_fk % {
- "to_table": self.quote_name(to_table),
- "to_column": self.quote_name(to_column),
- }
- elif self.connection.features.supports_foreign_keys:
- self.deferred_sql.append(self._create_fk_sql(model, field, "_fk_%(to_table)s_%(to_column)s"))
- # Add the SQL to our big list
- column_sqls.append("%s %s" % (
- self.quote_name(field.column),
- definition,
- ))
- # Autoincrement SQL (for backends with post table definition variant)
- if field.get_internal_type() in ("AutoField", "BigAutoField"):
- autoinc_sql = self.connection.ops.autoinc_sql(model._meta.db_table, field.column)
- if autoinc_sql:
- self.deferred_sql.extend(autoinc_sql)
- # Add any unique_togethers (always deferred, as some fields might be
- # created afterwards, like geometry fields with some backends)
- for fields in model._meta.unique_together:
- columns = [model._meta.get_field(field).column for field in fields]
- self.deferred_sql.append(self._create_unique_sql(model, columns))
- # Make the table
- sql = self.sql_create_table % {
- "table": self.quote_name(model._meta.db_table),
- "definition": ", ".join(column_sqls)
- }
- if model._meta.db_tablespace:
- tablespace_sql = self.connection.ops.tablespace_sql(model._meta.db_tablespace)
- if tablespace_sql:
- sql += ' ' + tablespace_sql
- # Prevent using [] as params, in the case a literal '%' is used in the definition
- self.execute(sql, params or None)
- # Add any field index and index_together's (deferred as SQLite3 _remake_table needs it)
- self.deferred_sql.extend(self._model_indexes_sql(model))
- # Make M2M tables
- for field in model._meta.local_many_to_many:
- if field.remote_field.through._meta.auto_created:
- self.create_model(field.remote_field.through)
|