diff options
author | Aaron Schulz <aschulz@wikimedia.org> | 2012-06-24 10:41:27 -0700 |
---|---|---|
committer | Aaron Schulz <aschulz@wikimedia.org> | 2012-12-04 15:51:12 -0800 |
commit | b028f9a3d2afba6c54deb600ea1003afcc643b77 (patch) | |
tree | 2dc9c87ee2ed20e23504ac7c11c38e96c607df01 | |
parent | 8112995e3767485f6d73393e6ed6ccfcb42b9d98 (diff) |
[RDBStore] Added sharded, external, DB support.origin/sandbox/aaron/rdbstore
* Added LocalRDBStore and ExternalRDBStore classes and helper scripts.
* Added $wgRDBStores variable to configure the RDB stores.
* Added $wgRDBStoredTables to configure where tables are stored.
* Added hooks so extensions can force tables to be on the same store
and register denormalized tables that can be re-synced.
Change-Id: Ic1e38db3d325d52ded6d2596af2b6bd3e9b870fe
-rw-r--r-- | docs/hooks.txt | 13 | ||||
-rw-r--r-- | includes/AutoLoader.php | 10 | ||||
-rw-r--r-- | includes/DefaultSettings.php | 30 | ||||
-rw-r--r-- | includes/rdbstore/ExternalRDBStore.php | 608 | ||||
-rw-r--r-- | includes/rdbstore/ExternalRDBStoreTablePartition.php | 89 | ||||
-rw-r--r-- | includes/rdbstore/ExternalRDBStoreTrxJournal.php | 278 | ||||
-rw-r--r-- | includes/rdbstore/LocalRDBStore.php | 128 | ||||
-rw-r--r-- | includes/rdbstore/LocalRDBStoreTablePartition.php | 64 | ||||
-rw-r--r-- | includes/rdbstore/RDBStore.php | 354 | ||||
-rw-r--r-- | includes/rdbstore/RDBStoreGroup.php | 183 | ||||
-rw-r--r-- | includes/rdbstore/RDBStoreTablePartition.php | 255 | ||||
-rw-r--r-- | maintenance/rdbstore/RDBAddCluster.php | 66 | ||||
-rw-r--r-- | maintenance/rdbstore/RDBPopulation.php | 98 | ||||
-rw-r--r-- | maintenance/rdbstore/RDBSchemaChange.php | 124 | ||||
-rw-r--r-- | maintenance/rdbstore/RDBSyncPartitions.php | 236 | ||||
-rw-r--r-- | maintenance/rdbstore/sample.sql | 12 |
16 files changed, 2548 insertions, 0 deletions
diff --git a/docs/hooks.txt b/docs/hooks.txt index 0c8780dd28c5..84dbf2487ffa 100644 --- a/docs/hooks.txt +++ b/docs/hooks.txt @@ -1768,6 +1768,19 @@ $out: OutputPage object &$obj: RawPage object &$text: The text that's going to be the output +'RDBStoreTableSchemaInfo': Called when the RDBStoreGroup singleton is initialized. +&groups : Map of group names to a list of tables. + Extensions may register tables here to make sure groups of tables are in the same store. +&$syncInfo : Map of tables to denormalization information, which consists of: + - srcShardKey : the canonical shard column name + - dupShardKeys : list of shard columns for duplicated rows + - uniqueKey : list of the fields that comprise a unique key + - timeColumn : indexed timestamp column name + This should only be used for tables that duplicate data to shard on different + columns (e.g. user and object ID) and where the table is also either: + - (a) insert-only (no updates/deletes) + - (b) insert/update-only and all duplicate rows only have immutable columns + 'RecentChange_save': called at the end of RecentChange::save() $recentChange: RecentChange object diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index 461fadfe66fe..22ba0c461d8a 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -510,6 +510,16 @@ $wgAutoloadLocalClasses = array( 'SavepointPostgres' => 'includes/db/DatabasePostgres.php', 'SQLiteField' => 'includes/db/DatabaseSqlite.php', + # includes/rdbstore + 'ExternalRDBStore' => 'includes/rdbstore/ExternalRDBStore.php', + 'ExternalRDBStoreTablePartition' => 'includes/rdbstore/ExternalRDBStoreTablePartition.php', + 'ExternalRDBStoreTrxJournal' => 'includes/rdbstore/ExternalRDBStoreTrxJournal.php', + 'LocalRDBStore' => 'includes/rdbstore/LocalRDBStore.php', + 'LocalRDBStoreTablePartition' => 'includes/rdbstore/LocalRDBStoreTablePartition.php', + 'RDBStore' => 'includes/rdbstore/RDBStore.php', + 'RDBStoreGroup' => 'includes/rdbstore/RDBStoreGroup.php', + 'RDBStoreTablePartition' => 'includes/rdbstore/RDBStoreTablePartition.php', + # includes/debug 'MWDebug' => 'includes/debug/Debug.php', diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php index 2073f168b732..233142109f90 100644 --- a/includes/DefaultSettings.php +++ b/includes/DefaultSettings.php @@ -1626,6 +1626,36 @@ $wgExternalServers = array(); $wgDefaultExternalStore = false; /** + * Associative array of storage names to configuration arrays, each having: + * - clusterPrefix : The prefix to each external cluster name + * - clusterCount : The number of clusters in the store (power of 2) + * The clusters are all named <clusterPrefix><x>, where x is in [1,<clusterCount>]. + * These clusters need to actually exist in $wgExternalServers or $wgLBFactoryConf. + * + * This setting must be global to all wikis that may use the store. + * + * Example use: + * $wgRDBStores['store1'] = array( 'clusterPrefix' => 'main-rdb-cluster', 'clusterCount' => 2 ) + * + * @var array + */ +$wgRDBStores = array(); + +/** + * Map of tables to external $wgRDBStores names for each wiki. + * Core or extensions may check this to determine where a table is placed. + * This can be used to partition large tables into horizontally scalable RDBMs. + * + * This setting must be global to all wikis that may use the store. + * + * Example use: + * $wgRDBStores['mywiki'] = array( 'table1' => 'store1', 'table2' => 'store1' ) + * + * @var array + */ +$wgRDBStoredTables = array(); + +/** * Revision text may be cached in $wgMemc to reduce load on external storage * servers and object extraction overhead for frequently-loaded revisions. * diff --git a/includes/rdbstore/ExternalRDBStore.php b/includes/rdbstore/ExternalRDBStore.php new file mode 100644 index 000000000000..536f523eb411 --- /dev/null +++ b/includes/rdbstore/ExternalRDBStore.php @@ -0,0 +1,608 @@ +<?php +/** + * This file deals with sharded RDBMs stores. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup RDBStore + * @author Aaron Schulz + */ + +/** + * Class representing an external DB storage system. + * Tables are sharded vertically based on the wiki ID and + * horizontally based on a table column used as a "shard key". + * + * The shard key determines what cluster a table partition maps to. + * We use cluster = (integerhash(column value) mod (# of clusters)) + 1. + * The 1 is added to the remainder so the cluster names start at "1". + * + * The number of clusters must be a power of 2. This makes the re-balancing required + * with the addition of new clusters fairly straightforward and avoids downtime. + * For example, say we have four clusters: + * cluster1 [hash has remainder 0] + * cluster2 [hash has remainder 1] + * cluster3 [hash has remainder 2] + * cluster4 [hash has remainder 3] + * We can add four new clusters, resulting in the following: + * cluster1 [remainder (0 now, 0 before)] + * cluster2 [remainder (1 now, 1 before)] + * cluster3 [remainder (2 now, 2 before)] + * cluster4 [remainder (3 now, 3 before)] + * cluster5 [start as replica of cluster1] [remainder (4 now, 0 before)] + * cluster6 [start as replica of cluster2] [remainder (5 now, 1 before)] + * cluster7 [start as replica of cluster3] [remainder (6 now, 2 before)] + * cluster8 [start as replica of cluster4] [remainder (7 now, 3 before)] + * What was in cluster1 is now split between cluster1 and cluster5. + * Since cluster5 started as a full clone of cluster1 (via MySQL replication), + * the only disruption will be a brief read-only period where cluster5 is + * caught up and the ExternalRDBStore cluster config is updated on the servers. + * After the change is done, there will be unused, outdated, duplicate partition + * tables on the wrong shards. These can be dropped as needed for disk space. + * The same trick can be used to keep doubling the amount of storage. + * + * If 2PC is enabled, then $wgDebugLogGroups['RDBStoreTrx'] should be set. + * + * @ingroup RDBStore + * @since 1.20 + */ +class ExternalRDBStore extends RDBStore { + /** @var Array */ + protected $clusters = array(); // list of cluster names + /** @var ExternalRDBStoreTrxJournal */ + protected $trxJournal; + /** @var Array */ + protected $connsWithTrx = array(); // list of DatabaseBase objects + + protected $trxIdXA = null; // string + + protected $name; // string + protected $clusterCount; // integer + protected $clusterPrefix; // string + protected $enable2PC = false; // bool + + const SHARD_COUNT = 256; // integer; consistent partitions per (wiki,table) (power of 2) + + /** + * @param $options array + * @throws MWException + */ + public function __construct( array $options ) { + parent::__construct( $options ); + + $this->name = $options['name']; + if ( !strlen( $options['clusterPrefix'] ) ) { + throw new MWException( "Option 'clusterPrefix' is not valid." ); + } + $this->clusterPrefix = $options['clusterPrefix']; + + $logB2 = log( $options['clusterCount'], 2 ); // float + if ( $logB2 != floor( $logB2 ) ) { + throw new MWException( "Option 'clusterCount' must be a power of 2." ); + } + $this->clusterCount = $options['clusterCount']; + + for ( $i = 1; $i <= $options['clusterCount']; $i++ ) { + $this->clusters[] = $options['clusterPrefix'] . $i; + } + + if ( isset( $options['enable2PC'] ) ) { + $this->enable2PC = $options['enable2PC']; + } + $liveCheck2PC = isset( $options['liveCheck2PC'] ) + ? $options['liveCheck2PC'] + : $this->enable2PC; + + $this->trxJournal = new ExternalRDBStoreTrxJournal( $this ); + if ( $liveCheck2PC ) { + $this->trxJournal->periodicLogCheck(); // resolve limbo transactions + } + } + + /** + * @see RDBStore::getName() + * @return string + */ + public function getName() { + return $this->name; + } + + /** + * @see RDBStore::isInternal() + * @return bool + */ + public function isInternal() { + return false; + } + + /** + * @see RDBStore::isPartitioned() + * @return bool + */ + public function isPartitioned() { + return true; + } + + /** + * @see RDBStore::beginOutermost() + * @see DatabaseBase::begin() + * @return void + */ + protected function beginOutermost() { + if ( $this->enable2PC ) { + $this->trxIdXA = wfTimestamp( TS_MW ) . '-' . wfRandomString( 32 ); + wfDebug( "Starting XA transaction with ID {$this->trxIdXA}.\n" ); + } else { + wfDebug( "Starting transaction.\n" ); + } + // ExternalRDBStoreTablePartition will make sure that DB write queries are + // wrapped in a transaction, which this class will commit in finishOutermost(). + // These connections in the transaction are tracked in $this->connsWithTrx. + } + + /** + * @see RDBStore::finishOutermost() + * @see DatabaseBase::commit() + * @return void + */ + protected function finishOutermost() { + $xaTrxConns = array(); // DB servers in the XA transaction + // End all transactions, committing regular local transactions... + foreach ( $this->connsWithTrx as $conn ) { // DBs with XA/local transactions + if ( isset( $conn->xa_trx_id ) ) { // part of XA transaction + $xaTrxConns[] = $conn; + } elseif ( $conn->trxLevel() ) { // non-XA transaction + $conn->commit(); // regular COMMIT + $this->unregisterConnTrxInternal( $conn ); + } + } + // Prepare and commit the (ended) XA transactions... + if ( count( $xaTrxConns ) == 1 ) { // single DB transaction (1PC) + $conn = reset( $xaTrxConns ); + $this->connCommitXAInternal( $conn, '1PC' ); // also unregisters + wfDebug( "Committed XA transaction with ID {$this->trxIdXA}.\n" ); + } elseif ( count( $xaTrxConns ) >= 1 ) { // distributed DB transaction (2PC) + $clusters = array_map( function( $conn ) { return $conn->xa_cluster; }, $xaTrxConns ); + $this->trxJournal->onPrePrepare( $this->trxIdXA, $clusters ); + $connsPrepared = array(); // DBs servers prepared in the XA transaction + try { // try to prepare on all DB servers... + foreach ( $xaTrxConns as $conn ) { + $this->connPrepareXAInternal( $conn ); + $connsPrepared[] = $conn; + } + } catch ( DBError $e ) { // rollback all DB servers on failure... + foreach ( $connsPrepared as $conn ) { + try { // rollback as much as we can now + $this->connRollbackXAInternal( $conn ); // also unregisters + } catch ( DBError $e2 ) {}; + } + throw $e; // throw original exception back + } + $this->trxJournal->onPreCommit(); + foreach ( $connsPrepared as $conn ) { + $this->connCommitXAInternal( $conn, '2PC' ); // also unregisters + } + $this->trxJournal->onPostCommit(); + wfDebug( "Committed distributed XA transaction with ID {$this->trxIdXA}.\n" ); + } else { + wfDebug( "Committed transaction.\n" ); + } + $this->trxIdXA = null; // done + } + + /** + * Get an object representing a shard of a virtual DB table. + * Each table is sharded on at least one column key, and possibly + * denormalized and sharded on muliple column keys (e.g. rev ID, page ID, user ID). + * + * @see RDBStore::doGetPartition() + * @return ExternalRDBStoreTablePartition + */ + protected function doGetPartition( $table, $column, $value, $wiki ) { + // Map this row to a consistent table shard, which only depends on $value. + // This mapping MUST always remain consistent (immutable)! + $hash = substr( sha1( $value ), 0, 4 ); // 65535 possible values + $index = (int)base_convert( $hash, 16, 10 ) % self::SHARD_COUNT; // [0,1023] + return new ExternalRDBStoreTablePartition( $this, $table, $index, $column, $value, $wiki ); + } + + /** + * @see RDBStore::doGetAllPartitions() + * @return Array List of ExternalRDBStoreTablePartition objects + */ + protected function doGetAllPartitions( $table, $column, $wiki ) { + $partitions = array(); + for ( $index = 0; $index < self::SHARD_COUNT; $index++ ) { + $partitions[] = new ExternalRDBStoreTablePartition( + $this, $table, $index, $column, null, $wiki + ); + } + return $partitions; + } + + /** + * Outside callers should generally not need this and should avoid using it + * + * @return Array List of cluster names for this store. + */ + public function getClusters() { + return $this->clusters; + } + + /** + * Get a map of DB cluster names to shard indexes they serve. + * Outside callers should generally not need this and should avoid using it. + * + * @return Array + */ + public function getClusterMapping() { + $map = array(); + for ( $index = 0; $index < self::SHARD_COUNT; $index++ ) { + $map[$this->getClusterForIndex( $index )][] = $index; + } + return $map; + } + + /** + * Outside callers should generally not need this and should avoid using it. + * + * @return bool Two-Phase-Commit is enabled + */ + public function is2PCEnabled() { + return $this->enable2PC; + } + + /** + * Format a shard number by padding out the digits as needed. + * Outside callers should generally not need this and should avoid using it. + * + * @param $index integer + * @return string + */ + public function formatShardIndex( $index ) { + $decimals = strlen( self::SHARD_COUNT - 1 ); + return sprintf( "%0{$decimals}d", $index ); // e.g "033" + } + + /** + * Outside callers should generally not need this and should avoid using it + * + * @param $index integer Partition index + * @return string Name of DB cluster reponsible for this shard index + * @throws MWException + */ + public function getClusterForIndex( $index ) { + if ( $index < 0 || $index >= self::SHARD_COUNT ) { + throw new MWException( "Index $index is not a valid partition index." ); + } + // This mapping MUST always remain consistent (immutable)! + return $this->clusterPrefix . ( ( $index % $this->clusterCount ) + 1 ); + } + + /** + * Outside callers should generally not need this and should avoid using it + * + * @param $index integer Partition index + * @return LoadBalancer For the cluster the partition index is on + */ + public function getClusterLBForIndex( $index ) { + return wfGetLBFactory()->getExternalLB( $this->getClusterForIndex( $index ) ); + } + + /** + * Return all table partitions that exist for a virtual table. + * This can be used to verify schema change completion for a table. + * + * @see DatabaseBase::tableExists() + * @param $table string Virtual table name + * @param $column string Shard key column name + * @param $wiki string Wiki ID (default to current wiki) + * @return Array List of partition table names + */ + public function partitionsWhereTableExists( $table, $column, $wiki = false ) { + $ctpartitions = array(); + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + foreach ( $this->doGetAllPartitions( $table, $column, $wiki ) as $tp ) { + if ( $tp->getMasterDB()->tableExists( $tp->getPartitionTable() ) ) { + $ctpartitions[] = $tp->getPartitionTable(); + } + } + return $ctpartitions; + } + + /** + * Return all table partitions that exist for a virtual table. + * This can be used to verify schema change completion for a table. + * + * @see DatabaseBase::tableExists() + * @param $table string Virtual table name + * @param $column string Shard key column name + * @param $wiki string Wiki ID (default to current wiki) + * @return Array List of partition table names + */ + public function partitionsWhereTableNotExists( $table, $column, $wiki = false ) { + return array_diff( + $this->getPartitionTableNames( $table, $column ), + $this->partitionsWhereTableExists( $table, $column, $wiki ) + ); + } + + /** + * Return all table partitions that have a specified column. + * This can be used to verify schema change completion for a table. + * + * @see DatabaseBase::fieldExists() + * @param $table string Virtual table name + * @param $column string Shard key column name + * @param $field string Column name to check + * @param $wiki string Wiki ID (default to current wiki) + * @return Array List of partition table names + */ + public function partitionsWhereFieldExists( $table, $column, $field, $wiki = false ) { + $ctpartitions = array(); + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + foreach ( $this->doGetAllPartitions( $table, $column, $wiki ) as $tp ) { + if ( $tp->getMasterDB()->fieldExists( $tp->getPartitionTable(), $field ) ) { + $ctpartitions[] = $tp->getPartitionTable(); + } + } + return $ctpartitions; + } + + /** + * Return all table partitions that do not have a specified column. + * This can be used to verify schema change completion for a table. + * + * @see DatabaseBase::fieldExists() + * @param $table string Virtual table name + * @param $column string Shard key column name + * @param $field string Column name to check + * @param $wiki string Wiki ID (default to current wiki) + * @return Array List of partition table names + */ + public function partitionsWhereFieldNotExists( $table, $column, $field, $wiki = false ) { + return array_diff( + $this->getPartitionTableNames( $table, $column ), + $this->partitionsWhereFieldExists( $table, $column, $field, $wiki ) + ); + } + + /** + * Return all table partitions that have a specified index. + * This can be used to verify schema change completion for a table. + * + * @see DatabaseBase::indexExists() + * @param $table string Virtual table name + * @param $column string Shard key column name + * @param $index string Index name to check + * @param $wiki string Wiki ID (default to current wiki) + * @return Array List of partition table names + */ + public function partitionsWhereIndexExists( $table, $column, $index, $wiki = false ) { + $ctpartitions = array(); + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + foreach ( $this->doGetAllPartitions( $table, $column, $wiki ) as $tp ) { + if ( $tp->getMasterDB()->indexExists( $tp->getPartitionTable(), $index ) ) { + $ctpartitions[] = $tp->getPartitionTable(); + } + } + return $ctpartitions; + } + + /** + * Return all table partitions that do not have a specified index. + * This can be used to verify schema change completion for a table. + * + * @see DatabaseBase::indexExists() + * @param $table string Virtual table name + * @param $column string Shard key column name + * @param $index string Index name to check + * @param $wiki string Wiki ID (default to current wiki) + * @return Array List of partition table names + */ + public function partitionsWhereIndexNotExists( $table, $column, $index, $wiki = false ) { + return array_diff( + $this->getPartitionTableNames( $table, $column ), + $this->partitionsWhereIndexExists( $table, $column, $index, $wiki ) + ); + } + + /** + * Get a list of the partition tables for a given virtual DB table and shard column. + * Outside callers should generally not need this and should avoid using it. + * + * @param $table string Virtual DB table + * @param $column string Column the table is sharded on + * @return Array List of partition table names + */ + protected function getPartitionTableNames( $table, $column ) { + if ( !is_string( $table ) || !is_string( $column ) ) { + throw new MWException( "Missing table or column name." ); + } + $pTables = array(); + for ( $index = 0; $index < self::SHARD_COUNT; $index++ ) { + $shard = $this->formatShardIndex( $index ); // e.g "0033" + $pTables[] = "{$table}__{$shard}__{$column}"; + } + return $pTables; + } + + /** + * Do not call this function from places outside ExternalRDBStore + * + * @return string|null Format is "<14 digits>-<32 hex chars>" + */ + public function getTrxIdXAInternal() { + return $this->trxIdXA; + } + + /** + * Do not call this function from places outside ExternalRDBStore + * + * @param $conn DatabaseBase + * @return void + */ + public function registerConnTrxInternal( DatabaseBase $conn ) { + $this->connsWithTrx[] = $conn; + } + + /** + * Do not call this function from places outside ExternalRDBStore + * + * @param $conn DatabaseBase + * @return void + */ + public function unregisterConnTrxInternal( DatabaseBase $conn ) { + unset( $this->connsWithTrx[array_search( $conn, $this->connsWithTrx )] ); + } + + /** + * Do not call this function from places outside ExternalRDBStore + * + * @param $conn DatabaseBase + * @param $cluster string Cluster name for $conn + * @return void + * @throws DBUnexpectedError + */ + public function connStartXAInternal( DatabaseBase $conn, $cluster ) { + if ( !strlen( $cluster ) ) { + throw new DBUnexpectedError( $conn, "No DB cluster name provided." ); + } + if ( !isset( $conn->xa_trx_id ) ) { // not in XA transaction already + $conn->clearFlag( DBO_TRX ); // use XA statements instead of BEGIN + if ( $conn->getType() === 'mysql' ) { + $conn->query( "XA START {$conn->addQuotes($this->trxIdXA)}", __METHOD__ ); + } elseif ( $conn->getType() === 'postgres' ) { + $conn->query( "BEGIN", __METHOD__ ); + } else { + throw new DBUnexpectedError( $conn, "DB does not support 2PC." ); + } + $conn->xa_cluster = $cluster; + $conn->xa_trx_id = $this->trxIdXA; + $this->registerConnTrxInternal( $conn ); + } + } + + /** + * Do not call this function from places outside ExternalRDBStore + * + * @param $conn DatabaseBase + * @return void + * @throws DBUnexpectedError + */ + public function connPrepareXAInternal( DatabaseBase $conn ) { + if ( $conn->getType() === 'mysql' ) { + $conn->query( "XA END {$conn->addQuotes($this->trxIdXA)}", __METHOD__ ); + $conn->query( "XA PREPARE {$conn->addQuotes($this->trxIdXA)}", __METHOD__ ); + } elseif ( $conn->getType() === 'postgres' ) { + $conn->query( "PREPARE TRANSACTION {$conn->addQuotes($this->trxIdXA)}", __METHOD__ ); + } else { + throw new DBUnexpectedError( $conn, "DB does not support 2PC." ); + } + } + + /** + * Do not call this function from places outside ExternalRDBStore + * + * @param $conn DatabaseBase + * @param $mode string Either '1PC' or '2PC' + * @param $id string Transaction ID (defaults to current) + * @return void + * @throws DBUnexpectedError + */ + public function connCommitXAInternal( DatabaseBase $conn, $mode = '2PC', $id = null ) { + $id = $id ? $id : $this->trxIdXA; + if ( $conn->getType() === 'mysql' ) { + if ( $mode === '1PC' ) { + $conn->query( "XA END {$conn->addQuotes($this->trxIdXA)}", __METHOD__ ); + $conn->query( "XA COMMIT {$conn->addQuotes($id)} ONE PHASE", __METHOD__ ); + } else { // 2PC + $conn->query( "XA COMMIT {$conn->addQuotes($id)}", __METHOD__ ); + } + } elseif ( $conn->getType() === 'postgres' ) { + if ( $mode === '1PC' ) { + $conn->query( "COMMIT" ); + } else { // 2PC + $conn->query( "COMMIT PREPARED {$conn->addQuotes($id)}", __METHOD__ ); + } + } else { + throw new DBUnexpectedError( $conn, "DB does not support 2PC." ); + } + unset( $conn->xa_trx_id ); // done + if ( $id === $this->trxIdXA ) { // from begin()/finish()? + $this->unregisterConnTrxInternal( $conn ); + } + } + + /** + * Do not call this function from places outside ExternalRDBStore + * + * @param $conn DatabaseBase + * @param $id string Transaction ID (defaults to current) + * @return void + * @throws DBUnexpectedError + */ + public function connRollbackXAInternal( DatabaseBase $conn, $id = null ) { + $id = $id ? $id : $this->trxIdXA; + if ( $conn->getType() === 'mysql' ) { + $conn->query( "XA ROLLBACK {$conn->addQuotes($id)}", __METHOD__ ); + } elseif ( $conn->getType() === 'postgres' ) { + $conn->query( "ROLLBACK PREPARED {$conn->addQuotes($id)}", __METHOD__ ); + } else { + throw new DBUnexpectedError( $conn, "DB does not support 2PC." ); + } + unset( $conn->xa_trx_id ); // done + if ( $id === $this->trxIdXA ) { // from begin()/finish()? + $this->unregisterConnTrxInternal( $conn ); + } + } + + /** + * Do not call this function from places outside ExternalRDBStore + * + * @param $conn DatabaseBase + * @param $id string Transaction ID + * @return bool + * @throws DBUnexpectedError + */ + public function connExistsXAInternal( DatabaseBase $conn, $id ) { + return in_array( $id, $this->connGetAllPreparedXAInternal( $conn ) ); + } + + /** + * Do not call this function from places outside ExternalRDBStore + * + * @param $conn DatabaseBase + * @return Array List of prepared transaction IDs + * @throws DBUnexpectedError + */ + public function connGetAllPreparedXAInternal( DatabaseBase $conn ) { + $trxIds = array(); + if ( $conn->getType() === 'mysql' ) { + foreach ( $conn->query( "XA RECOVER", __METHOD__ ) as $row ) { + $trxIds[] = $row->data; + } + } elseif ( $conn->getType() === 'postgres' ) { + foreach ( $conn->query( "SELECT * FROM pg_prepared_xacts", __METHOD__ ) as $row ) { + $trxIds[] = $row->transaction; + } + } else { + throw new DBUnexpectedError( $conn, "DB does not support 2PC." ); + } + return $trxIds; + } +} diff --git a/includes/rdbstore/ExternalRDBStoreTablePartition.php b/includes/rdbstore/ExternalRDBStoreTablePartition.php new file mode 100644 index 000000000000..ca217c0d2f9a --- /dev/null +++ b/includes/rdbstore/ExternalRDBStoreTablePartition.php @@ -0,0 +1,89 @@ +<?php +/** + * This file deals with sharded RDBMs stores. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup RDBStore + * @author Aaron Schulz + */ + +/** + * Class representing a single partition of a virtual DB table + * + * @ingroup RDBStore + * @since 1.20 + */ +class ExternalRDBStoreTablePartition extends RDBStoreTablePartition { + /** @var LoadBalancer */ + protected $lb; + + /** + * @param $rdbStore ExternalRDBStore + * @param $table string Virtual table name + * @param $index integer Partition index + * @param $key string Shard key column name + * @param $value array Shard key column value + * @param $wiki string Wiki ID + */ + public function __construct( + ExternalRDBStore $rdbStore, $table, $index, $key, $value, $wiki + ) { + $this->rdbStore = $rdbStore; + $this->lb = $rdbStore->getClusterLBForIndex( $index ); + $this->vTable = (string)$table; + $this->pTable = "{$table}__{$rdbStore->formatShardIndex( $index )}__{$key}"; + $this->key = (string)$key; + $this->value = (string)$value; + $this->wiki = (string)$wiki; + } + + /** + * @see RDBStoreTablePartition + * @return DatabaseBase + */ + public function getSlaveDB() { + $conn = $this->lb->getConnection( DB_SLAVE, array(), $this->wiki ); + if ( $this->rdbStore->hasTransaction() ) { + $conn->setFlag( DBO_TRX ); // wrap queries in one transaction + $this->rdbStore->registerConnTrxInternal( $conn ); + } else { + $conn->clearFlag( DBO_TRX ); // auto-commit by default + } + return $conn; + } + + /** + * @see RDBStoreTablePartition + * @return DatabaseBase + */ + public function getMasterDB() { + $conn = $this->lb->getConnection( DB_MASTER, array(), $this->wiki ); + if ( $this->rdbStore->hasTransaction() ) { + if ( $this->rdbStore->getTrxIdXAInternal() ) { // XA transaction + $info = $this->lb->parentInfo(); // DB cluster info + $this->rdbStore->connStartXAInternal( $conn, $info['id'] ); + } else { // non-XA transaction + $conn->setFlag( DBO_TRX ); // wrap queries in one transaction + $this->rdbStore->registerConnTrxInternal( $conn ); + } + } else { + $conn->clearFlag( DBO_TRX ); // auto-commit by default + } + return $conn; + } +} diff --git a/includes/rdbstore/ExternalRDBStoreTrxJournal.php b/includes/rdbstore/ExternalRDBStoreTrxJournal.php new file mode 100644 index 000000000000..6d84c3698228 --- /dev/null +++ b/includes/rdbstore/ExternalRDBStoreTrxJournal.php @@ -0,0 +1,278 @@ +<?php +/** + * This file deals with sharded RDBMs stores. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup RDBStore + * @author Aaron Schulz + */ + +/** + * Class that helps with cross-shard commits and anomaly correction in external RDB stores. + * This logs the transaction ID and affected servers before the PREPARE/COMMIT phases. + * Distributed transactions that use this class should essentially work as follows: + * - a) The coordinator prepare transaction journal is updated + * - b) Changes are proposed on the relevant DBs, starting XA transactions + * - c) The coordinator commit transaction journal is updated + * - d) Changes are prepared and committed on the relevant DBs, ending XA transactions + * These log files are deleted when everything is committed successfully. Old log files are + * assumed to correspond to failed transactions and are used to repair the affected DB rows. + * + * @ingroup RDBStore + * @since 1.20 + */ +class ExternalRDBStoreTrxJournal { + /** @var ExternalRDBStore */ + protected $rdbStore; + + /** @var Array */ + protected $directory; // string; file system path + protected $trxId; // string; timestamped UUID + protected $phase = self::PHASE_READY; // integer + + const PHASE_READY = 1; // not in a transaction + const PHASE_PREPARING = 2; // ready for PREPAREs + const PHASE_COMMITTING = 3; // agreement on all changes, ready for COMMITs + + const STALE_SEC = 900; // how old a .tlog file must be to be "stale" + const CHECK_SEC = 60; // period for checking for stale .tlog files + + /** + * @param $rdbStore ExternalRDBStore + */ + public function __construct( ExternalRDBStore $rdbStore ) { + $this->rdbStore = $rdbStore; + $this->directory = wfTempDir() . '/mw-rdbstore/' . rawurlencode( $rdbStore->getName() ); + } + + /** + * Function to be called right before transaction PREPARE + * + * @param $trxId string Unique global transaction ID + * @param $clusters array List of DB cluster names for each server + * @return void + */ + public function onPrePrepare( $trxId, array $clusters ) { + if ( $this->phase !== self::PHASE_READY ) { + throw new MWException( "Transaction alreay in progress." ); + } + $this->trxId = $trxId; + // Log the PREPARE attempt to a file in /tmp space + $data = serialize( array( 'clusters' => $clusters ) ); + wfProfileIn( __METHOD__ . '-log' ); + wfMkdirParents( $this->directory ); // create dir if needed + $bytes = file_put_contents( "{$this->directory}/{$this->trxId}.prepare.tlog", $data ); + wfProfileOut( __METHOD__ . '-log' ); + if ( $bytes !== strlen( $data ) ) { + throw new MWException( "Could not write to '{$this->trxId}.prepare.tlog' file." ); + } + wfDebugLog( "RDBStoreTrx", "{$this->rdbStore->getName()}\tPREPARE\t{$this->trxId}\n" ); + $this->phase = self::PHASE_PREPARING; + } + + /** + * Function to be called right before transaction COMMIT + * + * @return void + */ + public function onPreCommit() { + if ( $this->phase !== self::PHASE_PREPARING ) { + throw new MWException( "Transaction not prepared." ); + } + // Log the COMMIT decision to a file in /tmp space + wfProfileIn( __METHOD__ . '-log' ); + $ok = $this->fsyncCopy( + "{$this->directory}/{$this->trxId}.prepare.tlog", + "{$this->directory}/{$this->trxId}.commit.tlog" + ); + wfProfileOut( __METHOD__ . '-log' ); + if ( !$ok ) { + throw new MWException( "Could not create '{$this->trxId}.commit.tlog' file." ); + } + wfDebugLog( "RDBStoreTrx", "{$this->rdbStore->getName()} COMMIT {$this->trxId}\n" ); + $this->phase = self::PHASE_COMMITTING; + } + + /** + * Function to be called right after transaction COMMIT + * + * @return void + */ + public function onPostCommit() { + if ( $this->phase !== self::PHASE_COMMITTING ) { + throw new MWException( "Transaction is not ready to commit." ); + } + wfProfileIn( __METHOD__ . '-log' ); + unlink( "{$this->directory}/{$this->trxId}.prepare.tlog" ); // not needed anymore + unlink( "{$this->directory}/{$this->trxId}.commit.tlog" ); // not needed anymore + wfProfileOut( __METHOD__ . '-log' ); + $this->trxId = null; + $this->phase = self::PHASE_READY; + } + + /** + * @param $src string + * @param $dst string + * @return bool + */ + protected function fsyncCopy( $src, $dst ) { + if ( !wfIsWindows() ) { // *nix + $encSrc = wfEscapeShellArg( $src ); + $encDst = wfEscapeShellArg( $dst ); + $retVal = 1; + wfShellExec( "dd if={$encSrc} of={$encDst} conv=fsync", $retVal ); + return ( $retVal == 0 ); + } else { // windows + return copy( $src, $dst ); + } + } + + /** + * @return integer Number of .tlog files consumed + */ + public function periodicLogCheck() { + wfProfileIn( __METHOD__ ); + try { + $cache = ObjectCache::newAccelerator( array() ); // local server cache + } catch ( MWException $e ) { + $cache = new EmptyBagOStuff(); + } + $key = "rdbstore-collect-{$this->rdbStore->getName()}"; + $time = $cache->get( $key ); + if ( !is_int( $time ) ) { // cache miss + wfSuppressWarnings(); + $time = filemtime( "{$this->directory}/collect" ); // false if file doesn't exist + wfRestoreWarnings(); + $cache->set( $key, (int)$time, 86400 ); + } + if ( $time < ( time() - self::CHECK_SEC ) ) { + $cache->set( $key, time(), 86400 ); + wfDebug( __METHOD__ . ": Checking for stale tlog files.\n" ); + $count = $this->resolveLimboTransactions( 10 ); + } else { + wfDebug( __METHOD__ . ": Skipping check for stale tlog files.\n" ); + $count = 0; + } + wfProfileOut( __METHOD__ ); + return $count; + } + + /** + * Find stale .tlog files lying around in /tmp and finish the corresponding + * DB transactions. The .tlog files are removed once the transactions resolve. + * + * @param $maxTrxCount integer Maximum .tlog files to consume + * @return integer Number of .tlog files consumed + */ + public function resolveLimboTransactions( $maxTrxCount = 0 ) { + wfProfileIn( __METHOD__ ); + + // Make sure only one process on this server is doing this... + wfMkdirParents( $this->directory ); // create dir if needed + $lockFileHandle = fopen( "{$this->directory}/collect", 'w' ); // open & touch + if ( !$lockFileHandle ) { + wfProfileOut( __METHOD__ ); + return false; // bail out + } elseif ( !flock( $lockFileHandle, LOCK_EX | LOCK_NB ) ) { + fclose( $lockFileHandle ); + wfProfileOut( __METHOD__ ); + return false; // someone else probably beat us + } + + $e = null; // exception + try { + $logs = array(); // (trx ID => {filename,step}) + // Get all of the old .tlog files lying around... + $dirHandle = opendir( $this->directory ); + if ( $dirHandle ) { + $cutoff = time() - self::STALE_SEC; + $fileRegex = '/^(\d{14}-[0-9a-f]{32})\.(commit|prepare)\.tlog$/'; + while ( $file = readdir( $dirHandle ) ) { + $m = array(); // matches + if ( !preg_match( $fileRegex, $file, $m ) ) { + continue; // sanity + } elseif ( filemtime( "{$this->directory}/{$file}" ) > $cutoff ) { + continue; // wait; transaction may still be in progress + } + list( /*all*/, $trxId, $step ) = $m; + if ( $step === 'commit' || !isset( $logs[$trxId] ) ) { // commit has precedence + $info = unserialize( file_get_contents( "{$this->directory}/{$file}" ) ); + if ( is_array( $info ) ) { + $logs[$trxId] = $info + array( 'step' => $step ); + } + } + } + closedir( $dirHandle ); + } + wfDebugLog( 'RDBStore', "Found " . count( $logs ) . " limbo transaction(s).\n" ); + // Run through transactions that failed and resolve them... + $countDone = 0; // count fixed + foreach ( $logs as $trxId => $trxInfo ) { + if ( $this->finishTransaction( $trxId, $trxInfo ) ) { + ++$countDone; // another transaction resolved + if ( file_exists( "{$this->directory}/{$trxId}.prepare.tlog" ) ) { + unlink( "{$this->directory}/{$trxId}.prepare.tlog" ); + } + if ( file_exists( "{$this->directory}/{$trxId}.commit.tlog" ) ) { + unlink( "{$this->directory}/{$trxId}.commit.tlog" ); + } + if ( $maxTrxCount && $countDone >= $maxTrxCount ) { + break; // only handle so many tlog files + } + } + } + wfDebugLog( 'RDBStore', "Repaired $countDone limbo transaction(s).\n" ); + } catch ( Exception $e ) {} + + // Release locks so other processes can poll these files... + flock( $lockFileHandle, LOCK_UN ); + fclose( $lockFileHandle ); + + wfProfileOut( __METHOD__ ); + if ( $e instanceof Exception ) { + throw $e; // throw back any exception + } + + return count( $logs ); + } + + /** + * @param $trxId string + * @param $trxInfo array Map of trx ID, commit status, and clusters used + * @return boolean + */ + protected function finishTransaction( $trxId, array $trxInfo ) { + foreach ( $trxInfo['clusters'] as $cluster ) { // for each cohort + $lb = wfGetLBFactory()->getExternalLB( $cluster ); // LB for cohort + $dbw = $lb->getConnection( DB_MASTER ); // DB master + $dbw->clearFlag( DBO_TRX ); // don't wrap in BEGIN + if ( $this->rdbStore->connExistsXAInternal( $dbw, $trxId ) ) { // already resolved? + if ( $trxInfo['step'] === 'commit' ) { // COMMIT decision made + $this->rdbStore->connCommitXAInternal( $dbw, '2PC', $trxId ); + wfDebugLog( 'RDBStore', "Commited transaction '$trxId'.\n" ); + } else { // no COMMIT decision made; ROLLBACK + $this->rdbStore->connRollbackXAInternal( $dbw, $trxId ); + wfDebugLog( 'RDBStore', "Rolled back transaction '$trxId'.\n" ); + } + } else { + wfDebugLog( 'RDBStore', "Could not find transaction '$trxId'.\n" ); + } + } + return true; + } +} diff --git a/includes/rdbstore/LocalRDBStore.php b/includes/rdbstore/LocalRDBStore.php new file mode 100644 index 000000000000..0e4ec6574e86 --- /dev/null +++ b/includes/rdbstore/LocalRDBStore.php @@ -0,0 +1,128 @@ +<?php +/** + * This file deals with local RDBMs stores. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup RDBStore + * @author Aaron Schulz + */ + +/** + * Class representing a simple, non-external, DB storage system. + * Tables are not sharded, and only the wiki cluster is used. + * + * @ingroup RDBStore + * @since 1.20 + */ +class LocalRDBStore extends RDBStore { + /** @var LoadBalancer */ + protected $lb; + + protected $wiki; // string + + /** + * @param $options array + */ + public function __construct( array $options ) { + $this->wiki = ( $options['wiki'] === false ) ? wfWikiID() : $options['wiki']; + // Use a separate connection so this handles transactions like external RDB stores + $this->lb = wfGetLBFactory()->newMainLB( $this->wiki ); + } + + /** + * @see RDBStore::getName() + * @return string + */ + public function getName() { + return "local-{$this->wiki}"; + } + + /** + * @see RDBStore::isInternal() + * @return bool + */ + public function isInternal() { + return true; + } + + /** + * @see RDBStore::isPartitioned() + * @return bool + */ + public function isPartitioned() { + return false; + } + + /** + * @see RDBStore::beginOutermost() + * @see DatabaseBase::begin() + * @return void + */ + protected function beginOutermost() { + $this->getMasterDB()->begin(); + } + + /** + * @see RDBStore::finishOutermost() + * @see DatabaseBase::commit() + * @return void + */ + protected function finishOutermost() { + $this->getMasterDB()->commit(); + } + + /** + * @see RDBStore::doGetPartition() + * @return LocalRDBStoreTablePartition + */ + protected function doGetPartition( $table, $column, $value, $wiki ) { + if ( $wiki !== $this->wiki ) { + throw new DBUnexpectedError( "Wiki ID '$wiki' does not match '{$this->wiki}'." ); + } + return new LocalRDBStoreTablePartition( $this, $table, $column, $value, $wiki ); + } + + /** + * @see RDBStore::doGetAllPartitions() + * @return Array List of LocalRDBStoreTablePartition objects + */ + protected function doGetAllPartitions( $table, $column, $wiki ) { + if ( $wiki !== $this->wiki ) { + throw new DBUnexpectedError( "Wiki ID '$wiki' does not match '{$this->wiki}'." ); + } + return array( new LocalRDBStoreTablePartition( $this, $table, $column, null, $wiki ) ); + } + + /** + * Outside callers should generally not need this and should avoid using it + * + * @return DatabaseBase + */ + public function getSlaveDB() { + return $this->lb->getConnection( DB_SLAVE, array(), $this->wiki ); + } + + /** + * Outside callers should generally not need this and should avoid using it + * + * @return DatabaseBase + */ + public function getMasterDB() { + return $this->lb->getConnection( DB_MASTER, array(), $this->wiki ); + } +} diff --git a/includes/rdbstore/LocalRDBStoreTablePartition.php b/includes/rdbstore/LocalRDBStoreTablePartition.php new file mode 100644 index 000000000000..cad264001b40 --- /dev/null +++ b/includes/rdbstore/LocalRDBStoreTablePartition.php @@ -0,0 +1,64 @@ +<?php +/** + * This file deals with local RDBMs stores. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup RDBStore + * @author Aaron Schulz + */ + +/** + * Class representing a single partition of a virtual DB table. + * This is just a regular table on the non-external main DB. + * + * @ingroup RDBStore + * @since 1.20 + */ +class LocalRDBStoreTablePartition extends RDBStoreTablePartition { + /** + * @param $rdbStore LocalRDBStore + * @param $table string Table name + * @param $key string Shard key column name + * @param $value Array Shard key column value + * @param $wiki string Wiki ID + */ + public function __construct( LocalRDBStore $rdbStore, $table, $key, $value, $wiki ) { + $this->rdbStore = $rdbStore; + $this->vTable = (string)$table; + $this->pTable = (string)$table; + $this->key = (string)$key; + $this->value = (string)$value; + $this->wiki = (string)$wiki; + } + + /** + * @see RDBStoreTablePartition::getSlaveDB() + * @return DatabaseBase + */ + public function getSlaveDB() { + return $this->rdbStore->getSlaveDB(); + } + + /** + * @see RDBStoreTablePartition::getMasterDB() + * @return DatabaseBase + */ + public function getMasterDB() { + return $this->rdbStore->getMasterDB(); + } +} diff --git a/includes/rdbstore/RDBStore.php b/includes/rdbstore/RDBStore.php new file mode 100644 index 000000000000..aa7aaddddc26 --- /dev/null +++ b/includes/rdbstore/RDBStore.php @@ -0,0 +1,354 @@ +<?php +/** + * @defgroup RDBStore RDBStore + * + * This file deals with shardable RDBMs stores. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup RDBStore + * @author Aaron Schulz + */ + +/** + * Class representing a relational DB storage system. + * Callers access tables as if they were horizontally partitioned, + * which may actually be the case for large, external, RDB stores. + * + * Partitioning for each table is based on a single non-NULL column. + * Only the column value will be used to determine partitions, not the column name. + * + * Read queries wrapped in begin() and finish() will use the default isolation level of the DB. + * All DB servers should be configured to use REPEATABLE-READ by default (like stock MySQL) so + * that separate read queries in a transaction will be consistent with a single snapshot in time. + * Queries performed on different DBs will see different point-in-time snapshots for each DB, + * since each will have its own local transaction. + * + * Note that cross-DB queries are not supported in this class. Tables should be sharded such + * that all queries only need to affect a single shard. If this is not possible, one can: + * - a) Have a canonical and a duplicate version of a table, each sharded in different ways. + * For example, the former being sharded on page ID and the later on user ID. + * Read queries for page X or user Y could thus be routed to a single shard. + * A drawback is that this essentially doubles the physical size of the data set. + * - b) Have a canonical and an index version of a table, each sharded in different ways. + * For example, the former being sharded on page ID and the later on user ID. The index + * table duplicates the main table, except only having columns used in WHERE clauses. + * Read queries for page X would hit a canonical table shard, while read queries for user + * Y would hit an index shard and then fetch the full rows from the main table shards. + * This takes up less space than (a) and can reduce the chance for cross-shard updates. + * A drawback is that user=Y queries require O(N) queries for result sets of size N. + * This should only be done if N is very small or if it is heavily cached (memcached). + * - c) Have a duplicate time-based rolling table stored in the local DB, if possible. + * For example, if a thread table is sharded on thread ID, one could also store the last + * few months of rows in a local DB table. Since the table is not sharded, anything that + * is properly indexed could be queried and JOINs could be done with local tables. + * Data inconsistencies are self-correcting as all rows are eventually rotated out. + * Cross-DB transactions may not be atomic, so options (a) and (b) can have permanent anomalies. + * Reads used for writes should always be done on the canonical table shards as good practice. + * For (a), if rows are only inserted and they have an indexed creation timestamp field, it's + * possible to synchronize denormalized data with a repair script. For (b), if rows are only + * inserted/updated, they have an indexed creation timestamp field, and all of the "index table" + * columns are immutable, then its possible to synchronize denormalized data with a repair script. + * + * Cross-DB transactions may not actually be atomic. When COMMITs are issued to each DB, + * it is possible for some COMMITs to succeed while others fail, though this should be rare. + * If this is problematic one might be able to do the following: + * - a) Use certian MVCC patterns. For example, if a local DB row needs to be created, + * referencing several RDB store blobs, one can commit all the blobs first, using + * UUIDs as foreign keys, updating the local DB only if all the blobs were committed. + * If the row and blobs need to be updated, new blobs with different UUIDs can be used. + * Failures just leave unreferenced garbage in the worst case scenario. + * - b) Partition tables such that related rows of different tables map to the same DB. + * This is useful if, for example, a table is partitioned on page ID but a table that + * stores aggregate data for by page ID is needed. The later can simply be partitioned + * by page ID, ensuring that rows for either table for any page X will be on the same DB. + * Sharded stores can be configured to use Two-Phase-Commit (2PC) for cross-DB transactions. + * PostgreSQL8 supports 2PC and MySQL 5 also has 2PC Open XA statements (useless in MySQL 5.X, + * planned to be fixed in MySQL 6.1). Limbo transactions are periodically resolved by the same + * application servers that initiated them. If they are unable to do so, then the transactions + * have to be resolved manually or by a repair script. Limbo transactions hold persistent locks. + * @see http://dev.mysql.com/doc/refman/5.0/en/xa-states.html + * @see http://www.postgresql.org/docs/9.1/static/sql-prepare-transaction.html + * + * When callers use begin() and finish(), the data might not be committed until later if another + * caller already called begin() and has yet to call finish(). Output to the user can be buffered + * in the normal means (e.g. OutputPage) in a way that blindly assumes it has been committed. If + * the commit fails, a DBError exception will be thrown and the buffered output will be tossed. + * Things like emails or job insertion, which need to happen only after successful commit, can be + * made to happen in a callback function. This callback can be passed to finish(), and will only + * triggered after successful commit. + * + * Example usage from an extension: + * @code + * $rdbs = RDBStoreGroup::singleton()->getForTable( 'ext-myextension' ); + * $tp = $rdbs->getPartition( 'comments', 'thread', 501 ); + * $res = $tp->select( DB_SLAVE, '*', array( 'thread' => 501, 'comment' => 11 ), __METHOD__ ); + * @endcode + * + * @code + * $rdbs = RDBStoreGroup::singleton()->getForTable( 'ext-myextension' ); + * $rdbs->begin(); + * $rdbs->getPartition( 'user_info', 'user', $row['user'] )->insert( $row, __METHOD__ ); + * $rdbs->finish(); + * @endcode + * + * @code + * $rdbs = RDBStoreGroup::singleton()->getForTableGroup( 'ext-myextension' ); + * $rdbs->begin(); + * // This table is sharded by user and and duplicated to shard on event + * $places = $rdbs->columnValueMap( $row, array( 'user', 'event' ) ); + * // Insert the subscription row into canonical and duplicate shards... + * foreach ( $rdbs->getEachPartition( 'subscriptions', $places ) as $tp ) { + * $tp->insert( $row, __METHOD__ ); + * } + * $rdbs->finish(); + * @endcode + * + * @code + * $rdbs = RDBStoreGroup::singleton()->getForTableGroup( 'ext-myextension' ); + * $rdbs->begin(); + * // Insert the comment row into the canonical shard... + * $rdbs->getPartition( 'comments', 'thread', $row['thread'] )->insert( $row, __METHOD__ ); + * // Insert a row that references the comment row into the user index shard... + * $irow = $rdbs->columnValueMap( $row, array( 'user', 'thread', 'comment' ) ); + * $rdbs->getPartition( 'comments', 'user', $row['user'] )->insert( $irow, __METHOD__ ); + * // Update the stats for the thread... + * $tp = $rdbs->getPartition( 'thread_stats', 'thread', $row['thread'] ); + * $tp->update( 'thread_stats', array(...), array( 'thread' => $row['thread'] ), __METHOD__ ); + * $rdbs->finish( function() { + * MyExtension::enqueueNotifyJobs( $row ); + * } ); + * @endcode + * + * @ingroup RDBStore + * @since 1.20 + */ +abstract class RDBStore { + protected $trxDepth = 0; // integer + protected $trxStartTime; // float; UNIX timestamp + protected $postCommitCallbacks = array(); // list of post-commit callbacks + + /** + * @param $options array + */ + public function __construct( array $options ) {} + + /** + * @return string Name of this store + */ + abstract public function getName(); + + /** + * @return bool Whether this store is the primary DB of a wiki + */ + abstract public function isInternal(); + + /** + * @return bool Tables are actually partitioned in the store + */ + abstract public function isPartitioned(); + + /** + * Check if the store is currently in a DB transaction. + * Outside callers should generally not need this and should avoid using it. + * + * @return bool + */ + final public function hasTransaction() { + return ( $this->trxDepth > 0 ); + } + + /** + * Increment the transaction counter and begin a transaction if the counter was zero + * + * @return bool DB transaction actually started + * @throws DBError + */ + final public function begin() { + if ( $this->trxDepth == 0 ) { + $this->beginOutermost(); // BEGIN for the outermost transaction + $this->trxStartTime = microtime( true ); + } + ++$this->trxDepth; // increment the transaction counter in any case + wfDebug( __METHOD__ . ": transaction nesting level now {$this->trxDepth}.\n" ); + return ( $this->trxDepth == 1 ); + } + + /** + * @see RDBStore::begin() + * @return void + */ + abstract protected function beginOutermost(); + + /** + * Decrement the transaction counter and commit the transaction if the counter is zero. + * + * Callers can register post-commit callback functions (including closures) here. + * When the transaction is committed, any post-commit callback functions are called. + * + * @param $callback Closure Optional function to call after database COMMIT + * @return bool DB transaction actually committed + * @throws DBError + */ + final public function finish( Closure $callback = null ) { + if ( $this->trxDepth <= 0 ) { + throw new MWException( "Detected unmatched finish() call." ); + } + if ( $callback !== null ) { + // Register post-commit callback given by the caller + $this->postCommitCallbacks[] = $callback; + } + if ( $this->trxDepth == 1 ) { + $this->finishOutermost(); // COMMIT for the outermost transaction + } + --$this->trxDepth; // decrement the transaction counter in any case + wfDebug( __METHOD__ . ": transaction nesting level now {$this->trxDepth}.\n" ); + // Once the transaction level is 0, anything must have been committed. + // Trigger any post-commit callbacks (which assume the data is committed). + if ( $this->trxDepth == 0 ) { + $e = null; // any exception + $callbacks = $this->postCommitCallbacks; + $this->postCommitCallbacks = array(); // recursion guard + foreach ( $callbacks as $callback ) { + try { + $callback(); + } catch ( Exception $e ) {} + } + if ( $e instanceof Exception ) { + throw $e; // throw back the last exception + } + } + return ( $this->trxDepth == 0 ); + } + + /** + * @see RDBStore::finish() + * @return void + */ + abstract protected function finishOutermost(); + + /** + * Get an object representing a shard of a virtual DB table. + * If this store is not partitioned, this returns an object for the entire table. + * + * Each table is canonically sharded on one column key, and may possibly be + * denormalized and sharded on additional column keys (e.g. thread ID, user ID). + * + * @param $table string Virtual table name + * @param $column string Shard key column name + * @param $value string Shard key column value + * @param $wiki string Wiki ID; defaults to the current wiki + * @return RDBStoreTablePartition + * @throws MWException + */ + final public function getPartition( $table, $column, $value, $wiki = false ) { + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + if ( !isset( $column ) || !isset( $value ) ) { + throw new MWException( "Missing table shard column or value." ); + } + return $this->doGetPartition( $table, $column, $value, $wiki ); + } + + /** + * @see RDBStore::getPartition() + * @return RDBStoreTablePartition + */ + abstract protected function doGetPartition( $table, $column, $value, $wiki ); + + /** + * Get a list of objects representing all shards of a virtual DB table. + * If this store is not partitioned, the list has one object for the entire table. + * + * Each table is canonically sharded on one column key, and may possibly be + * denormalized and sharded on additional column keys (e.g. thread ID, user ID). + * + * @param $table string Virtual table name + * @param $column string Shard key column name + * @param $wiki string Wiki ID; defaults to the current wiki + * @return Array List of RDBStoreTablePartition objects + * @throws MWException + */ + final public function getAllPartitions( $table, $column, $wiki = false ) { + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + if ( !isset( $column ) ) { + throw new MWException( "Missing table shard column." ); + } + return $this->doGetAllPartitions( $table, $column, $wiki ); + } + + /** + * @see RDBStore::getAllPartitions() + * @return Array List of RDBStoreTablePartition objects + */ + abstract protected function doGetAllPartitions( $table, $column, $wiki ); + + /** + * Get the column to value map for a row for a given list of columns. + * All these columns must be present in the row or an error will be thrown. + * + * @param $row Row|array + * @param $columns array + * @return Array + * @throws MWException + */ + final public function columnValueMap( $row, array $columns ) { + $map = array_intersect_key( (array)$row, $columns ); + $diff = array_diff( $columns, array_keys( $map ) ); + if ( count( $diff ) ) { + throw new MWException( "Row is missing column(s) " . implode( ', ', $diff ) ); + } + return $map; + } + + /** + * Get a list of objects representing shards of a virtual DB table. + * A shard is returned for each (shard column, column value) tuple given if the + * store is partitioned, and for the first tuple if the store is not partitioned. + * + * This is for tables that are canonically sharded on one column key, and also + * denormalized and sharded on additional column keys (e.g. thread ID, user ID). + * + * When writing to multiple shards, callers should always do so in a certain order. + * For example, if a table is sharded on thread ID and denormalized to shard on user ID, + * updates should first write to the thread shard and then to the user shard. This reduces + * the chance for deadlocks arising from transaction locks on rows. If P1 locks rows on + * thread partition A, and blocks on P2 for rows of user partition B, it can't be the case + * that P2 is also blocked on P1 for rows of thread partition A (due to the write order). + * + * @param $table string Virtual table name + * @param $map array Map of shard column names to values from RDBStore::columnValueMap() + * @param $wiki string Wiki ID; defaults to the current wiki + * @return Array Map of column names to RDBStoreTablePartition objects (same order as $map) + * @throws MWException + */ + final public function getEachPartition( $table, array $map, $wiki = false ) { + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + $partitions = array(); + foreach ( $map as $column => $value ) { + if ( !is_string( $column ) ) { + throw new MWException( "Invalid column name." ); + } elseif ( !is_scalar( $value ) ) { + throw new MWException( "Invalid value for column '$column'." ); + } + $partitions[$column] = $this->getPartition( $table, $column, $value, $wiki ); + if ( !$this->isPartitioned() ) { + break; // the data is all one one table + } + } + return $partitions; + } +} diff --git a/includes/rdbstore/RDBStoreGroup.php b/includes/rdbstore/RDBStoreGroup.php new file mode 100644 index 000000000000..150f7b1695c2 --- /dev/null +++ b/includes/rdbstore/RDBStoreGroup.php @@ -0,0 +1,183 @@ +<?php +/** + * This file deals with sharded database stores. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup RDBStore + * @author Aaron Schulz + */ + +/** + * Factory class for getting RDBStore objects + * + * @ingroup RDBStore + * @since 1.20 + */ +class RDBStoreGroup { + /** @var RDBStoreGroup */ + protected static $instance = null; + + /** @var Array */ + protected $storeConfig = array(); // (store name => config array) + /** @var Array */ + protected $tableGroups = array(); // (group name => table list) + /** @var Array */ + protected $tableSyncInfo = array(); // (table name => config array) + + /** @var Array */ + protected $storedTables = array(); // (wiki ID => table name => store name) + + /** @var Array */ + protected $extStoreInstances = array(); // (store name => ExternalRDBStore) + /** @var Array */ + protected $intStoreInstances = array(); // (wiki ID => LocalRDBStore) + + protected function __construct() {} + + /** + * @return RDBStoreGroup + */ + public static function singleton() { + if ( self::$instance == null ) { + self::$instance = new self(); + self::$instance->initFromGlobals(); + } + return self::$instance; + } + + /** + * Destroy the singleton instance + * + * @return void + */ + public static function destroySingleton() { + self::$instance = null; + } + + /** + * Register db stores from the global variables + * + * @return void + */ + protected function initFromGlobals() { + global $wgRDBStores, $wgRDBStoredTables; + + $this->storeConfig = $wgRDBStores; + $this->storedTables = $wgRDBStoredTables; + // Let extensions register table groups and denormalized tables + wfRunHooks( 'RDBStoreTableSchemaInfo', + array( &$this->tableGroups, &$this->tableSyncInfo ) ); + } + + /** + * Get a DB store on the main cluster of a wiki. + * This uses a multi-singleton pattern to track transactions. + * + * @param $wiki string Wiki ID + * @return LocalRDBStore + */ + public function getInternal( $wiki = false ) { + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + if ( !isset( $this->intStoreInstances[$wiki] ) ) { + $this->intStoreInstances[$wiki] = new LocalRDBStore( array( 'wiki' => $wiki ) ); + } + return $this->intStoreInstances[$wiki]; + } + + /** + * Get an external DB store by name. + * This uses a multi-singleton pattern to track transactions. + * + * @param $name string Storage group ID + * @return ExternalRDBStore + */ + public function getExternal( $name ) { + if ( !isset( $this->extStoreInstances[$name] ) ) { + if ( !isset( $this->storeConfig[$name] ) ) { + throw new MWException( "No DB store defined with the name '$name'." ); + } + $this->storeConfig[$name]['name'] = $name; + $this->extStoreInstances[$name] = new ExternalRDBStore( $this->storeConfig[$name] ); + } + return $this->extStoreInstances[$name]; + } + + /** + * Get the DB store designated for a certain DB table. + * A LocalRDBStore will be returned if one is not configured. + * + * @param $table string Table name + * @param $wiki string Wiki ID + * @return RDBStore + */ + public function getForTable( $table, $wiki = false ) { + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + if ( isset( $this->storedTables[$wiki][$table] ) ) { + return $this->getExternal( $this->storedTables[$wiki][$table] ); + } else { + return $this->getInternal( $wiki ); + } + } + + /** + * Get the DB store designated for a certain DB table group. + * A LocalRDBStore will be returned if one is not configured. + * + * @param $name string Group ID + * @param $wiki string Wiki ID + * @return RDBStore + * @throws MWException + */ + public function getForTableGroup( $name, $wiki = false ) { + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + if ( isset( $this->tableGroups[$name] ) && count( $this->tableGroups[$name] ) ) { + // Make sure all tables in this group are on the same store... + $table = reset( $this->tableGroups[$name] ); // first table + $store = $this->storeNameForTable( $table, $wiki ); + // Make sure all tables in a each group are in a single store... + foreach ( $this->tableGroups[$name] as $table ) { + if ( $this->storeNameForTable( $table, $wiki ) !== $store ) { + $list = implode( ', ', $this->tableGroups[$name] ); + throw new MWException( "Table(s) '$list' do not map to a single RDB store." ); + } + } + return $this->getForTable( $table ); // store object for these tables + } else { + throw new MWException( "RDB store table group '$name' is undefined or empty." ); + } + } + + /** + * @param $table string Table name + * @return Array|null + */ + public function getTableSyncInfo( $table ) { + return isset( $this->tableSyncInfo[$table] ) ? $this->tableSyncInfo : null; + } + + /** + * @param $table string Table name + * @param $wiki string Wiki ID + * @return string|false Returns false if not an external store + */ + protected function storeNameForTable( $table, $wiki ) { + return isset( $this->storedTables[$wiki][$table] ) + ? $this->storedTables[$wiki][$table] + : false; // local + } +} diff --git a/includes/rdbstore/RDBStoreTablePartition.php b/includes/rdbstore/RDBStoreTablePartition.php new file mode 100644 index 000000000000..582a0f6a5229 --- /dev/null +++ b/includes/rdbstore/RDBStoreTablePartition.php @@ -0,0 +1,255 @@ +<?php +/** + * This file deals with sharded RDBMs stores. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup RDBStore + * @author Aaron Schulz + */ + +/** + * Class representing a single partition of a virtual DB table. + * If a shard column value is provided, queries are restricted + * to those that apply to that value; otherwise, queries can be + * made on the entire table partition. + * + * @ingroup RDBStore + * @since 1.20 + */ +abstract class RDBStoreTablePartition { + /** @var RDBStore */ + protected $rdbStore; + + protected $wiki; // string; wiki ID + protected $vTable; // string; virtual table name + protected $pTable; // string; partition table name + protected $key; // string; column name + protected $value; // string; column value + + /** + * @return string Wiki ID + */ + final public function getWiki() { + return $this->wiki; + } + + /** + * @return string Table name (e.g. "flaggedtemplates") + */ + final public function getVirtualTable() { + return $this->vTable; + } + + /** + * @return string Table name (e.g. "flaggedtemplates__030__ft_rev_id") + */ + final public function getPartitionTable() { + return $this->pTable; + } + + /** + * @return string Name of the column used to shard on (e.g. "ft_rev_id") + */ + final public function getPartitionKey() { + return $this->key; + } + + /** + * @return string|null Value of the shard column or NULL + */ + final public function getPartitionKeyValue() { + return $this->value; + } + + /** + * Use this function with usort() to order a list of partitions. + * This should be done for access patterns where several shards with the same shard + * column need to be written to. Doing writes in order reduces the possibility of deadlocks. + * + * @param RDBStoreTablePartition $a + * @param RDBStoreTablePartition $b + * @return integer (negative if a < b, 0 if a = b, positive if a > b) + */ + final public function compare( RDBStoreTablePartition $a, RDBStoreTablePartition $b ) { + return strcmp( $a->getPartitionTable(), $b->getPartitionTable() ); + } + + /** + * Similiar to DatabaseBase::select() except the first argument is DB_SLAVE/DB_MASTER + * + * @see DatabaseBase::select() + * @param $index integer + * @param $vars Array|string + * @param $conds Array + * @param $fname string + * @param $options Array + * @return ResultWrapper + */ + final public function select( $index, $vars, array $conds, $fname, $options = array() ) { + $this->assertKeyCond( $conds ); // sanity + if ( $index == DB_SLAVE ) { + $db = $this->getSlaveDB(); + } elseif ( $index == DB_MASTER ) { + $db = $this->getMasterDB(); + } else { + throw new MWException( "First argument must be DB_SLAVE or DB_MASTER." ); + } + return $db->select( $this->pTable, $vars, $conds, $fname, $options ); + } + + /** + * Similiar to DatabaseBase::selectRow() except the first argument is DB_SLAVE/DB_MASTER + * + * @see DatabaseBase::selectRow() + * @param $index integer + * @param $vars Array|string + * @param $conds Array + * @param $fname string + * @param $options Array + * @return ResultWrapper + */ + final public function selectRow( $index, $vars, array $conds, $fname, $options = array() ) { + $this->assertKeyCond( $conds ); // sanity + if ( $index == DB_SLAVE ) { + $db = $this->getSlaveDB(); + } elseif ( $index == DB_MASTER ) { + $db = $this->getMasterDB(); + } else { + throw new MWException( "First argument must be DB_SLAVE or DB_MASTER." ); + } + return $db->selectRow( $this->pTable, $vars, $conds, $fname, $options ); + } + + /** + * Similiar to DatabaseBase::selectField() except the first argument is DB_SLAVE/DB_MASTER + * + * @see DatabaseBase::selectField() + * @param $index integer + * @param $var string + * @param $conds Array + * @param $fname string + * @param $options Array + * @return ResultWrapper + */ + final public function selectField( $index, $var, array $conds, $fname, $options = array() ) { + $this->assertKeyCond( $conds ); // sanity + if ( $index == DB_SLAVE ) { + $db = $this->getSlaveDB(); + } elseif ( $index == DB_MASTER ) { + $db = $this->getMasterDB(); + } else { + throw new MWException( "First argument must be DB_SLAVE or DB_MASTER." ); + } + return $db->selectField( $this->pTable, $var, $conds, $fname, $options ); + } + + /** + * @see DatabaseBase::insert() + * @see DatabaseBase::affectedRows() + * @param $rows Array + * @param $fname String + * @param $options Array + * @return integer Number of affected rows + */ + final public function insert( array $rows, $fname, $options = array() ) { + $rows = ( isset( $rows[0] ) && is_array( $rows[0] ) ) ? $rows : array( $rows ); + array_map( array( $this, 'assertKeyCond' ), $rows ); // sanity + + $dbw = $this->getMasterDB(); + $dbw->insert( $this->pTable, $rows, $fname, $options ); + return $dbw->affectedRows(); + } + + /** + * @see DatabaseBase::update() + * @see DatabaseBase::affectedRows() + * @param $values Array + * @param $conds Array + * @param $fname String + * @return integer Number of affected rows + */ + final public function update( $values, array $conds, $fname ) { + $this->assertKeyCond( $conds ); // sanity + $this->assertKeyNotSet( $values ); // sanity + + $dbw = $this->getMasterDB(); + $dbw->update( $this->pTable, $values, $conds, $fname ); + return $dbw->affectedRows(); + } + + /** + * @see DatabaseBase::delete() + * @see DatabaseBase::affectedRows() + * @param $conds Array + * @param $fname String + * @return integer Number of affected rows + */ + final public function delete( array $conds, $fname ) { + $this->assertKeyCond( $conds ); // sanity + + $dbw = $this->getMasterDB(); + $dbw->delete( $this->pTable, $conds, $fname ); + return $dbw->affectedRows(); + } + + /** + * Get a direct slave DB connection. + * Queries should always be done use the provided wrappers. + * This can be used to call functions like DatabaseBase::timestamp(). + * + * @return DatabaseBase + */ + abstract public function getSlaveDB(); + + /** + * Get a direct master DB connection. + * Queries should always be done use the provided wrappers. + * This can be used to call functions like DatabaseBase::timestamp(). + * + * @return DatabaseBase + */ + abstract public function getMasterDB(); + + /** + * Do a (partition key => value) assertion on a WHERE or insertion row array. + * This sanity checks that the column actually exists and protects against callers + * forgetting to add the condition or saving rows to the wrong table shard. + * + * @param $conds array + */ + final protected function assertKeyCond( array $conds ) { + if ( !isset( $conds[$this->key] ) ) { + throw new MWException( "Shard column '{$this->key}' value not provided." ); + } elseif ( $this->value !== null && strval( $conds[$this->key] ) !== $this->value ) { + throw new MWException( "Shard column '{$this->key}' value is mismatched." ); + } + } + + /** + * Do a (partition key => value) assertion on a SET clause for an UPDATE statement. + * This sanity checks that the shard column value is not getting changed, which would + * make the row inaccessible since it would probably then be placed on the wrong shard. + * + * @param $set array + */ + final protected function assertKeyNotSet( array $set ) { + if ( isset( $set[$this->key] ) ) { + throw new MWException( "Shard column '{$this->key}' given in SET clause." ); + } + } +} diff --git a/maintenance/rdbstore/RDBAddCluster.php b/maintenance/rdbstore/RDBAddCluster.php new file mode 100644 index 000000000000..32817e51e841 --- /dev/null +++ b/maintenance/rdbstore/RDBAddCluster.php @@ -0,0 +1,66 @@ +<?php +/** + * Tool to help with addition of new clusters to an RDB store. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + * @ingroup Maintenance + */ + +require_once( dirname( __FILE__ ) . '/../Maintenance.php' ); + +/** + * Maintenance script to help with new ExternalRDBStore clusters + * + * @ingroup Maintenance + */ +class RDBAddCluster extends Maintenance { + public function __construct() { + parent::__construct(); + $this->addOption( "cluster", "Name of the DB cluster", true, true ); + $this->addOption( "dblist", "List of DBs to create", true, true ); + $this->mDescription = "Prepare a new DB cluster for RDB storage"; + } + + function getDbType() { + return Maintenance::DB_ADMIN; + } + + public function execute() { + $lb = wfGetLBFactory()->getExternalLB( $this->getOption( 'cluster' ) ); + + $server = $lb->getServerInfo( $lb->getWriterIndex() ); + $dbw = DatabaseBase::factory( $server['type'], $server ); + $type = $dbw->getType(); // DB server type (mysql, postgres) + + $dbList = $this->getOption( 'dblist' ); + if ( $dbList && is_file( $dbList ) ) { + $this->output( "Creating databases from '$dbList'.\n" ); + $dbNames = array_filter( explode( "\n", file_get_contents( $dbList ) ), 'strlen' ); + foreach ( $dbNames as $dbName ) { + $encDbName = $dbw->addIdentifierQuotes( $dbName ); + $dbw->query( "CREATE DATABASE IF NOT EXISTS $encDbName" ); + $this->output( "Created $type database '$dbName'.\n" ); + } + } + $this->output( "Done.\n" ); + } +} + +$maintClass = "RDBAddCluster"; +require_once( RUN_MAINTENANCE_IF_MAIN ); diff --git a/maintenance/rdbstore/RDBPopulation.php b/maintenance/rdbstore/RDBPopulation.php new file mode 100644 index 000000000000..c525eda2eec0 --- /dev/null +++ b/maintenance/rdbstore/RDBPopulation.php @@ -0,0 +1,98 @@ +<?php +/** + * Tool to help copy rows from a local DB table into sharded DB storage. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + * @ingroup Maintenance + */ + +require_once( dirname( __FILE__ ) . '/../Maintenance.php' ); + +/** + * Maintenance script to copy a table to a sharded DB store + * + * @ingroup Maintenance + */ +class RDBPopulation extends Maintenance { + public function __construct() { + parent::__construct(); + $this->addOption( "table", "Local DB table name", true, true ); + $this->addOption( "indexColumn", "Indexed integer column", true, true ); + $this->addOption( "shardColumn", "Shard column", true, true ); + $this->addOption( "store", "The external RDB store name", true, true ); + $this->mDescription = "Copy a local table into sharded DB storage"; + $this->setBatchSize( 500 ); + } + + public function execute() { + $table = $this->getOption( 'table' ); + $indexCol = $this->getOption( 'indexColumn' ); + $shardCol = $this->getOption( 'shardColumn' ); + + $dbw = $this->getDB( DB_MASTER ); + $start = $dbw->selectField( $table, "MIN($indexCol)", false, __METHOD__ ); + $end = $dbw->selectField( $table, "MAX($indexCol)", false, __METHOD__ ); + if ( !$start || !$end ) { + $this->output( "...$table table seems to be empty.\n" ); + return; + } elseif ( !$dbw->fieldExists( $table, $shardCol ) ) { // sanity + $this->error( "Table '$table' missing column '$shardCol'.", 1 ); // die + } + $rdbStore = RDBStoreGroup::singleton()->getExternal( $this->getOption( 'store' ) ); + + $first = true; + $count = 0; + // $indexCol may be a BIGINT or a UID, which are too large for PHP ints. + // Also UIDs are sparse, rather than incrementing by units of 1 each time. + // Lastly, $indexCol might be only the first column of a unique index. + $blockStart = $start; + do { + $inequality = $first ? ">=" : ">"; // inclusive for the first block + $res = $dbw->select( $table, '*', + array( + "$indexCol $inequality {$dbw->addQuotes( $blockStart )}", + "$indexCol <= {$dbw->addQuotes( $end )}" + ), + __METHOD__, + array( 'ORDER BY' => $indexCol, 'LIMIT' => $this->mBatchSize ) + ); + $n = $dbw->numRows( $res ); + + if ( $n ) { + $res->seek( $n - 1 ); + $blockEnd = $dbw->fetchObject( $res )->$indexCol; + $this->output( "...doing $indexCol from $blockStart to $blockEnd, $n row(s)\n" ); + } + + foreach ( $res as $row ) { + $pTable = $rdbStore->getPartition( $table, $shardCol, $row->$shardCol ); + $count += $pTable->insert( (array)$row, __METHOD__, array( 'IGNORE' ) ); + } + + $first = false; + $blockStart = $blockEnd; // highest ID done so far + wfWaitForSlaves( 5 ); + } while ( $n > 0 ); + + $this->output( "Done. Inserted $count rows from '$table' table.\n" ); + } +} + +$maintClass = "RDBPopulation"; +require_once( RUN_MAINTENANCE_IF_MAIN ); diff --git a/maintenance/rdbstore/RDBSchemaChange.php b/maintenance/rdbstore/RDBSchemaChange.php new file mode 100644 index 000000000000..84966aea77b9 --- /dev/null +++ b/maintenance/rdbstore/RDBSchemaChange.php @@ -0,0 +1,124 @@ +<?php +/** + * Tool to help with CREATE/ALTER statements on a virtual table that + * is split up into many shards in an ExternalRDBStore backend. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + * @ingroup Maintenance + */ + +require_once( dirname( __FILE__ ) . '/../Maintenance.php' ); + +/** + * Maintenance script to help with ExternalRDBStore schema changes + * + * @ingroup Maintenance + */ +class RDBSchemaChange extends Maintenance { + public function __construct() { + parent::__construct(); + $this->addOption( "mode", "Either 'generate' or 'run'", true, true ); + $this->addOption( "store", "The external RDB store name", true, true ); + $this->addOption( "patchfile", "The .sql file to run", false, true ); + $this->addOption( "dir", "Directory to generate or run .sql files", true, true ); + $this->mDescription = "Prepare or run a schema change for a table on a wiki"; + } + + function getDbType() { + return Maintenance::DB_ADMIN; + } + + public function execute() { + $dir = $this->getOption( 'dir' ); + if ( !is_dir( $dir ) && !mkdir( $dir ) ) { + $this->error( "Invalid SQL file directory '$dir'.", 1 ); + } + if ( $this->getOption( 'mode' ) === 'run' ) { + $this->runGeneratedSQL(); + } else { + $this->generateSQL(); + } + } + + protected function generateSQL() { + $dir = $this->getOption( 'dir' ); + // Read in the SQL patch file... + $patchFile = $this->getOption( 'patchfile' ); + if ( FileBackend::extensionFromPath( $patchFile ) !== 'sql' || !is_file( $patchFile ) ) { + $this->error( "Invalid SQL patch file '$patchFile' given.", 1 ); + } + $patchName = basename( $patchFile ); + $sql = file_get_contents( $patchFile ); + if ( $sql === false ) { + $this->error( "Could not read SQL patch file '$patchFile'.", 1 ); + } + // Get the SQL patch file to run on each cluster... + $rdbStore = RDBStoreGroup::singleton()->getExternal( $this->getOption( 'store' ) ); + foreach ( $rdbStore->getClusterMapping() as $cluster => $shards ) { + @mkdir( "{$dir}/{$cluster}" ); + // Go through each shard that this cluster serves... + foreach ( $shards as $index ) { + // Get the padded index shard number, which appears in tables names + $i = $rdbStore->formatShardIndex( $index ); // e.g. "0200" + // Transform the patch to account for the table sharding conventions. + // E.g., replace things like "flaggedimages/*__#SHARD#__fi_rev_id__*/" + // with things like "flaggedimages__0200__fi_rev_id". + $count = substr_count( $sql, '/*_*/' ); // table prefix notation + if ( $count && $count != substr_count( $sql, '__#SHARD#__' ) ) { + $this->error( "Could not transform SQL patch file '$patchFile'.", 1 ); + } + $shardSQL = preg_replace( '!/\*__#SHARD#__(\S+)\*/!U', "__{$i}__$1", $sql ); + if ( $shardSQL === $sql ) { + $this->error( "Could not transform SQL patch file '$patchFile'.", 1 ); + } + // Save SQL patch to make the schema change for each table shard + if ( !file_put_contents( "{$dir}/{$cluster}/{$i}-{$patchName}", $shardSQL ) ) { + $this->error( "Could not write SQL patch file '{$i}-{$patchName}'.", 1 ); + } + } + } + } + + protected function runGeneratedSQL() { + $wiki = $this->getOption( 'wiki' ); + if ( !strlen( $wiki ) ) { + $this->error( "No wiki ID provided.", 1 ); + } + $dir = $this->getOption( 'dir' ); + // Get the SQL patch file to run on each cluster... + $rdbStore = RDBStoreGroup::singleton()->getExternal( $this->getOption( 'store' ) ); + foreach ( $rdbStore->getClusterMapping() as $cluster => $shards ) { + // Go through all generated SQL files for this cluster... + if ( is_dir( "{$dir}/{$cluster}" ) ) { + $handle = opendir( "{$dir}/{$cluster}" ); + while ( $file = readdir( $handle ) ) { + if ( FileBackend::extensionFromPath( $file ) === 'sql' ) { + $lb = wfGetLBFactory()->getExternalLB( $cluster ); + $dbw = $lb->getConnection( DB_MASTER, array(), $wiki ); + $dbw->sourceFile( "{$dir}/{$cluster}/{$file}" ); + } + } + closedir( $handle ); + } + } + } +} + +$maintClass = "RDBSchemaChange"; +require_once( RUN_MAINTENANCE_IF_MAIN ); diff --git a/maintenance/rdbstore/RDBSyncPartitions.php b/maintenance/rdbstore/RDBSyncPartitions.php new file mode 100644 index 000000000000..6f2082a2f20d --- /dev/null +++ b/maintenance/rdbstore/RDBSyncPartitions.php @@ -0,0 +1,236 @@ +<?php +/** + * Tool to sync denormalized shards in a sharded DB storage. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + * @ingroup Maintenance + */ + +require_once( dirname( __FILE__ ) . '/../Maintenance.php' ); + +/** + * Maintenance script to fix denormalized shards in a sharded DB store + * + * @ingroup Maintenance + */ +class RDBSyncPartitions extends Maintenance { + protected $sTimestamp; // integer; UNIX timestamp + protected $eTimestamp; // integer; UNIX timestamp + + const TRX_LIMBO_SEC = 86400; // integer; prepared transactions this old are stale + + public function __construct() { + parent::__construct(); + $this->addOption( "store", "The external RDB store name", true, true ); + $this->addOption( "cluster", "The name of a DB cluster within the store", true, true ); + $this->addOption( "posdir", "Directory to store last-scan timestamp", true, true ); + $this->addOption( "fixlimbo", "Resolve all day old limbo transactions (for 2PC)" ); + $this->mDescription = "Sync denormalized table shards within sharded DB storage"; + $this->setBatchSize( 1000 ); + } + + public function execute() { + global $wgRDBStoredTables; + + $store = $this->getOption( 'store' ); + $cluster = $this->getOption( 'cluster' ); + $posdir = $this->getOption( 'posdir' ); + $fixlimbo = $this->hasOption( 'fixlimbo' ); + $wiki = wfWikiID(); + + $rdbs = RDBStoreGroup::singleton()->getExternal( $this->getOption( 'store' ) ); + $mapping = $rdbs->getClusterMapping(); // (cluster => indexes) + if ( !isset( $mapping[$cluster] ) ) { + $this->error( "Cluster '$cluster' does not exist in store '$store'.\n", 1 ); // die + } + $indexes = $mapping[$cluster]; // indexes handled by this cluster + + wfMkdirParents( "{$posdir}/{$cluster}" ); + wfSuppressWarnings(); + $startTsUnix = (int)file_get_contents( "{$posdir}/{$cluster}/{$wiki}" ); + wfRestoreWarnings(); + $startTsUnix = $startTsUnix ? $startTsUnix : 1; /* default to ~epoch */ + + $lb = wfGetLBFactory()->getExternalLB( $cluster ); + $dbw = $lb->getConnection( DB_MASTER ); + + $endTsUnix = time(); // do not scan rows added after this point + if ( $rdbs->is2PCEnabled() ) { + $this->output( "Checking for limbo transactions in RDB store '$store'...\n" ); + foreach ( $rdbs->connGetAllPreparedXAInternal( $dbw ) as $trxId ) { + if ( preg_match( '/^(\d{14})-([0-9a-f]{32})$/', $trxId, $matches ) ) { + list( /* all */, $ts, /* uuid */ ) = $matches; + $tsUnix = wfTimestamp( TS_UNIX, $ts ); + if ( $tsUnix < ( time() - self::TRX_LIMBO_SEC ) ) { + if ( $fixlimbo ) { // resolve limbo transaction + $rdbs->connRollbackXAInternal( $dbw, $trxId ); + $this->output( "Rolled back limbo transaction '$trxId'.\n" ); + } else { + $this->output( "Detected limbo transaction '$trxId'.\n" ); + $endTsUnix = min( $tsUnix, $endTsUnix ); + } + } else { + $endTsUnix = min( $tsUnix, $endTsUnix ); + } + } + } + } + $this->output( "Done.\n" ); + + $this->sTimestamp = $startTsUnix; + $this->eTimestamp = $endTsUnix - 900; // good measure (skew factor) + + $this->output( "Row timestamp scan range is [" . + wfTimestamp( TS_MW, $this->sTimestamp ) . "," . + wfTimestamp( TS_MW, $this->eTimestamp ) . "]\n" + ); + + $this->output( "Syncronizing denormalized data in RDB store '$store'...\n" ); + if ( $rdbs->isPartitioned() && isset( $wgRDBStoredTables[$wiki] ) ) { + foreach ( $wgRDBStoredTables[$wiki] as $table => $tstore ) { // each table + $syncInfo = RDBStoreGroup::singleton()->getTableSyncInfo( $table ); + if ( $tstore === $store && is_array( $syncInfo ) ) { // syncable + $this->output( "Syncing table $table...\n" ); + $count = $this->syncPush( $rdbs, $table, $syncInfo, $indexes ); + $this->output( "Pushed $count row(s) missing rows.\n" ); + $count = $this->syncPull( $rdbs, $table, $syncInfo, $indexes ); + $this->output( "Removed $count row(s) bogus rows.\n" ); + } + } + } + $this->output( "Done.\n" ); + + if ( file_put_contents( "{$posdir}/{$cluster}/{$wiki}", $this->eTimestamp ) !== false ) { + $this->output( "Updated UNIX timestamp position file to '{$this->eTimestamp}'.\n" ); + } + } + + /** + * Get the column => value pairs of $row with columns in the partition table + * + * @param $row array + * @param $tp RDBStoreTablePartition + */ + protected function getRowForTable( array $row, RDBStoreTablePartition $tp ) { + $resRow = array(); + foreach ( $row as $field => $value ) { + if ( $tp->getMasterDB()->fieldExists( $tp->getPartitionTable(), $field ) ) { + $resRow[$field] = $value; + } + } + return $resRow; + } + + /** + * Push updates from the canonical source shards to the duplicate shards. + * We don't need to select FOR UPDATE as we should only sync immutable data. + * + * @return integer Number of rows fixed + */ + protected function syncPush( RDBStore $rdbs, $table, array $info, array $indexes ) { + $cpartitions = array_map( $indexes, function( $index ) { + return new ExternalRDBStoreTablePartition( + $rdbs, $table, $index, $info['srcShardKey'], null, wfWikiID() ); + } ); + $count = 0; + foreach ( $cpartitions as $cpartition ) { // for each canonical shard + $dbw = $cpartition->getMasterDB(); + $start = $dbw->addQuotes( $dbw->timestamp( $this->sTimestamp ) ); + $end = $dbw->addQuotes( $dbw->timestamp( $this->eTimestamp ) ); + do { + $res = $cpartition->select( DB_MASTER, '*', + array( "{$info['timeColumn']} BETWEEN $start AND $end" ), + __METHOD__, + array( 'ORDER BY' => $info['timeColumn'], 'LIMIT' => $this->mBatchSize ) + ); + $oldStart = $start; // previous start + foreach ( $res as $srcRow ) { // each canonical row + $srcRow = (array)$srcRow; + foreach ( $info['dupShardKeys'] as $shardKey ) { // each duplicate shard for row + $dpartition = $rdbs->getPartition( $table, $shardKey, $srcRow[$shardKey] ); + $dupRow = $dpartition->selectRow( DB_MASTER, '*', + array_intersect_key( $srcRow, $info['uniqueKey'] ), + __METHOD__ + ); + if ( !$dupRow ) { // row missing in duplicate shard; create it + $dupRow = $this->getRowForTable( $srcRow, $dpartition ); + $dpartition->insert( $dupRow, __METHOD__, array( 'IGNORE' ) ); + $count++; + } + } + $start = $dbw->addQuotes( $srcRow[$info['timeColumn']] ); // advance + } + if ( $res->numRows() && $start === $oldStart ) { + $this->mBatchSize += 10; // don't get stuck + } + } while ( $res->numRows() >= $this->mBatchSize ); + } + return $count; + } + + /** + * Pull updates from the canonical source shards to the duplicate shards. + * We don't need to select FOR UPDATE as we should only sync immutable data. + * + * @return integer Number of rows fixed + */ + protected function syncPull( RDBStore $rdbs, $table, array $info, array $indexes ) { + $count = 0; + foreach ( $info['dupShardKeys'] as $shardKey ) { // each shard key column + $dpartitions = array_map( $indexes, function( $index ) { + return new ExternalRDBStoreTablePartition( + $rdbs, $table, $index, $shardKey, null, wfWikiID() ); + } ); + foreach ( $dpartitions as $dpartition ) { // each duplicate shard + $dbw = $dpartition->getMasterDB(); + $start = $dbw->addQuotes( $dbw->timestamp( $this->sTimestamp ) ); + $end = $dbw->addQuotes( $dbw->timestamp( $this->eTimestamp ) ); + do { + $res = $dpartition->select( DB_MASTER, '*', + array( "{$info['timeColumn']} BETWEEN $start AND $end" ), + __METHOD__, + array( 'ORDER BY' => $info['timeColumn'], 'LIMIT' => $this->mBatchSize ) + ); + $oldStart = $start; // previous start + foreach ( $res as $dupRow ) { // each duplicate row + $dupRow = (array)$dupRow; + $cpartition = $rdbs->getPartition( $table, // canonical shard for row + $info['srcShardKey'], $dupRow[$info['srcShardKey']] ); + $exists = $cpartition->selectField( DB_MASTER, '1', + array_intersect_key( $dupRow, $info['uniqueKey'] ), + __METHOD__ + ); + if ( !$exists ) { // row in duplicate shard is extraneous; delete it + $dpartition->delete( $dupRow, __METHOD__ ); + $count++; + } + $start = $dbw->addQuotes( $dupRow[$info['timeColumn']] ); // advance + } + if ( $res->numRows() && $start === $oldStart ) { + $this->mBatchSize += 10; // don't get stuck + } + } while ( $res->numRows() >= $this->mBatchSize ); + } + } + return $count; + } +} + +$maintClass = "RDBSyncPartitions"; +require_once( RUN_MAINTENANCE_IF_MAIN ); diff --git a/maintenance/rdbstore/sample.sql b/maintenance/rdbstore/sample.sql new file mode 100644 index 000000000000..a35b6bc4e671 --- /dev/null +++ b/maintenance/rdbstore/sample.sql @@ -0,0 +1,12 @@ + +-- Example table using sharding annotations +CREATE TABLE IF NOT EXISTS /*_*/mytable/*__#SHARD#__myt_id*/ ( + myt_id integer unsigned NOT NULL, + myt_name varchar(255) binary NOT NULL default '', + myt_timestamp varbinary(14) NULL, + myt_value varbinary(32) NOT NULL default '', + + PRIMARY KEY (myt_id) +) /*$wgDBTableOptions*/; + +CREATE UNIQUE INDEX /*i*/myt_value ON /*_*/mytable/*__#SHARD#__myt_id*/ (myt_value); |