summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Schulz <aschulz@wikimedia.org>2012-06-24 10:41:27 -0700
committerAaron Schulz <aschulz@wikimedia.org>2012-12-04 15:51:12 -0800
commitb028f9a3d2afba6c54deb600ea1003afcc643b77 (patch)
tree2dc9c87ee2ed20e23504ac7c11c38e96c607df01
parent8112995e3767485f6d73393e6ed6ccfcb42b9d98 (diff)
[RDBStore] Added sharded, external, DB support.origin/sandbox/aaron/rdbstore
* Added LocalRDBStore and ExternalRDBStore classes and helper scripts. * Added $wgRDBStores variable to configure the RDB stores. * Added $wgRDBStoredTables to configure where tables are stored. * Added hooks so extensions can force tables to be on the same store and register denormalized tables that can be re-synced. Change-Id: Ic1e38db3d325d52ded6d2596af2b6bd3e9b870fe
-rw-r--r--docs/hooks.txt13
-rw-r--r--includes/AutoLoader.php10
-rw-r--r--includes/DefaultSettings.php30
-rw-r--r--includes/rdbstore/ExternalRDBStore.php608
-rw-r--r--includes/rdbstore/ExternalRDBStoreTablePartition.php89
-rw-r--r--includes/rdbstore/ExternalRDBStoreTrxJournal.php278
-rw-r--r--includes/rdbstore/LocalRDBStore.php128
-rw-r--r--includes/rdbstore/LocalRDBStoreTablePartition.php64
-rw-r--r--includes/rdbstore/RDBStore.php354
-rw-r--r--includes/rdbstore/RDBStoreGroup.php183
-rw-r--r--includes/rdbstore/RDBStoreTablePartition.php255
-rw-r--r--maintenance/rdbstore/RDBAddCluster.php66
-rw-r--r--maintenance/rdbstore/RDBPopulation.php98
-rw-r--r--maintenance/rdbstore/RDBSchemaChange.php124
-rw-r--r--maintenance/rdbstore/RDBSyncPartitions.php236
-rw-r--r--maintenance/rdbstore/sample.sql12
16 files changed, 2548 insertions, 0 deletions
diff --git a/docs/hooks.txt b/docs/hooks.txt
index 0c8780dd28c5..84dbf2487ffa 100644
--- a/docs/hooks.txt
+++ b/docs/hooks.txt
@@ -1768,6 +1768,19 @@ $out: OutputPage object
&$obj: RawPage object
&$text: The text that's going to be the output
+'RDBStoreTableSchemaInfo': Called when the RDBStoreGroup singleton is initialized.
+&groups : Map of group names to a list of tables.
+ Extensions may register tables here to make sure groups of tables are in the same store.
+&$syncInfo : Map of tables to denormalization information, which consists of:
+ - srcShardKey : the canonical shard column name
+ - dupShardKeys : list of shard columns for duplicated rows
+ - uniqueKey : list of the fields that comprise a unique key
+ - timeColumn : indexed timestamp column name
+ This should only be used for tables that duplicate data to shard on different
+ columns (e.g. user and object ID) and where the table is also either:
+ - (a) insert-only (no updates/deletes)
+ - (b) insert/update-only and all duplicate rows only have immutable columns
+
'RecentChange_save': called at the end of RecentChange::save()
$recentChange: RecentChange object
diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php
index 461fadfe66fe..22ba0c461d8a 100644
--- a/includes/AutoLoader.php
+++ b/includes/AutoLoader.php
@@ -510,6 +510,16 @@ $wgAutoloadLocalClasses = array(
'SavepointPostgres' => 'includes/db/DatabasePostgres.php',
'SQLiteField' => 'includes/db/DatabaseSqlite.php',
+ # includes/rdbstore
+ 'ExternalRDBStore' => 'includes/rdbstore/ExternalRDBStore.php',
+ 'ExternalRDBStoreTablePartition' => 'includes/rdbstore/ExternalRDBStoreTablePartition.php',
+ 'ExternalRDBStoreTrxJournal' => 'includes/rdbstore/ExternalRDBStoreTrxJournal.php',
+ 'LocalRDBStore' => 'includes/rdbstore/LocalRDBStore.php',
+ 'LocalRDBStoreTablePartition' => 'includes/rdbstore/LocalRDBStoreTablePartition.php',
+ 'RDBStore' => 'includes/rdbstore/RDBStore.php',
+ 'RDBStoreGroup' => 'includes/rdbstore/RDBStoreGroup.php',
+ 'RDBStoreTablePartition' => 'includes/rdbstore/RDBStoreTablePartition.php',
+
# includes/debug
'MWDebug' => 'includes/debug/Debug.php',
diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php
index 2073f168b732..233142109f90 100644
--- a/includes/DefaultSettings.php
+++ b/includes/DefaultSettings.php
@@ -1626,6 +1626,36 @@ $wgExternalServers = array();
$wgDefaultExternalStore = false;
/**
+ * Associative array of storage names to configuration arrays, each having:
+ * - clusterPrefix : The prefix to each external cluster name
+ * - clusterCount : The number of clusters in the store (power of 2)
+ * The clusters are all named <clusterPrefix><x>, where x is in [1,<clusterCount>].
+ * These clusters need to actually exist in $wgExternalServers or $wgLBFactoryConf.
+ *
+ * This setting must be global to all wikis that may use the store.
+ *
+ * Example use:
+ * $wgRDBStores['store1'] = array( 'clusterPrefix' => 'main-rdb-cluster', 'clusterCount' => 2 )
+ *
+ * @var array
+ */
+$wgRDBStores = array();
+
+/**
+ * Map of tables to external $wgRDBStores names for each wiki.
+ * Core or extensions may check this to determine where a table is placed.
+ * This can be used to partition large tables into horizontally scalable RDBMs.
+ *
+ * This setting must be global to all wikis that may use the store.
+ *
+ * Example use:
+ * $wgRDBStores['mywiki'] = array( 'table1' => 'store1', 'table2' => 'store1' )
+ *
+ * @var array
+ */
+$wgRDBStoredTables = array();
+
+/**
* Revision text may be cached in $wgMemc to reduce load on external storage
* servers and object extraction overhead for frequently-loaded revisions.
*
diff --git a/includes/rdbstore/ExternalRDBStore.php b/includes/rdbstore/ExternalRDBStore.php
new file mode 100644
index 000000000000..536f523eb411
--- /dev/null
+++ b/includes/rdbstore/ExternalRDBStore.php
@@ -0,0 +1,608 @@
+<?php
+/**
+ * This file deals with sharded RDBMs stores.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup RDBStore
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class representing an external DB storage system.
+ * Tables are sharded vertically based on the wiki ID and
+ * horizontally based on a table column used as a "shard key".
+ *
+ * The shard key determines what cluster a table partition maps to.
+ * We use cluster = (integerhash(column value) mod (# of clusters)) + 1.
+ * The 1 is added to the remainder so the cluster names start at "1".
+ *
+ * The number of clusters must be a power of 2. This makes the re-balancing required
+ * with the addition of new clusters fairly straightforward and avoids downtime.
+ * For example, say we have four clusters:
+ * cluster1 [hash has remainder 0]
+ * cluster2 [hash has remainder 1]
+ * cluster3 [hash has remainder 2]
+ * cluster4 [hash has remainder 3]
+ * We can add four new clusters, resulting in the following:
+ * cluster1 [remainder (0 now, 0 before)]
+ * cluster2 [remainder (1 now, 1 before)]
+ * cluster3 [remainder (2 now, 2 before)]
+ * cluster4 [remainder (3 now, 3 before)]
+ * cluster5 [start as replica of cluster1] [remainder (4 now, 0 before)]
+ * cluster6 [start as replica of cluster2] [remainder (5 now, 1 before)]
+ * cluster7 [start as replica of cluster3] [remainder (6 now, 2 before)]
+ * cluster8 [start as replica of cluster4] [remainder (7 now, 3 before)]
+ * What was in cluster1 is now split between cluster1 and cluster5.
+ * Since cluster5 started as a full clone of cluster1 (via MySQL replication),
+ * the only disruption will be a brief read-only period where cluster5 is
+ * caught up and the ExternalRDBStore cluster config is updated on the servers.
+ * After the change is done, there will be unused, outdated, duplicate partition
+ * tables on the wrong shards. These can be dropped as needed for disk space.
+ * The same trick can be used to keep doubling the amount of storage.
+ *
+ * If 2PC is enabled, then $wgDebugLogGroups['RDBStoreTrx'] should be set.
+ *
+ * @ingroup RDBStore
+ * @since 1.20
+ */
+class ExternalRDBStore extends RDBStore {
+ /** @var Array */
+ protected $clusters = array(); // list of cluster names
+ /** @var ExternalRDBStoreTrxJournal */
+ protected $trxJournal;
+ /** @var Array */
+ protected $connsWithTrx = array(); // list of DatabaseBase objects
+
+ protected $trxIdXA = null; // string
+
+ protected $name; // string
+ protected $clusterCount; // integer
+ protected $clusterPrefix; // string
+ protected $enable2PC = false; // bool
+
+ const SHARD_COUNT = 256; // integer; consistent partitions per (wiki,table) (power of 2)
+
+ /**
+ * @param $options array
+ * @throws MWException
+ */
+ public function __construct( array $options ) {
+ parent::__construct( $options );
+
+ $this->name = $options['name'];
+ if ( !strlen( $options['clusterPrefix'] ) ) {
+ throw new MWException( "Option 'clusterPrefix' is not valid." );
+ }
+ $this->clusterPrefix = $options['clusterPrefix'];
+
+ $logB2 = log( $options['clusterCount'], 2 ); // float
+ if ( $logB2 != floor( $logB2 ) ) {
+ throw new MWException( "Option 'clusterCount' must be a power of 2." );
+ }
+ $this->clusterCount = $options['clusterCount'];
+
+ for ( $i = 1; $i <= $options['clusterCount']; $i++ ) {
+ $this->clusters[] = $options['clusterPrefix'] . $i;
+ }
+
+ if ( isset( $options['enable2PC'] ) ) {
+ $this->enable2PC = $options['enable2PC'];
+ }
+ $liveCheck2PC = isset( $options['liveCheck2PC'] )
+ ? $options['liveCheck2PC']
+ : $this->enable2PC;
+
+ $this->trxJournal = new ExternalRDBStoreTrxJournal( $this );
+ if ( $liveCheck2PC ) {
+ $this->trxJournal->periodicLogCheck(); // resolve limbo transactions
+ }
+ }
+
+ /**
+ * @see RDBStore::getName()
+ * @return string
+ */
+ public function getName() {
+ return $this->name;
+ }
+
+ /**
+ * @see RDBStore::isInternal()
+ * @return bool
+ */
+ public function isInternal() {
+ return false;
+ }
+
+ /**
+ * @see RDBStore::isPartitioned()
+ * @return bool
+ */
+ public function isPartitioned() {
+ return true;
+ }
+
+ /**
+ * @see RDBStore::beginOutermost()
+ * @see DatabaseBase::begin()
+ * @return void
+ */
+ protected function beginOutermost() {
+ if ( $this->enable2PC ) {
+ $this->trxIdXA = wfTimestamp( TS_MW ) . '-' . wfRandomString( 32 );
+ wfDebug( "Starting XA transaction with ID {$this->trxIdXA}.\n" );
+ } else {
+ wfDebug( "Starting transaction.\n" );
+ }
+ // ExternalRDBStoreTablePartition will make sure that DB write queries are
+ // wrapped in a transaction, which this class will commit in finishOutermost().
+ // These connections in the transaction are tracked in $this->connsWithTrx.
+ }
+
+ /**
+ * @see RDBStore::finishOutermost()
+ * @see DatabaseBase::commit()
+ * @return void
+ */
+ protected function finishOutermost() {
+ $xaTrxConns = array(); // DB servers in the XA transaction
+ // End all transactions, committing regular local transactions...
+ foreach ( $this->connsWithTrx as $conn ) { // DBs with XA/local transactions
+ if ( isset( $conn->xa_trx_id ) ) { // part of XA transaction
+ $xaTrxConns[] = $conn;
+ } elseif ( $conn->trxLevel() ) { // non-XA transaction
+ $conn->commit(); // regular COMMIT
+ $this->unregisterConnTrxInternal( $conn );
+ }
+ }
+ // Prepare and commit the (ended) XA transactions...
+ if ( count( $xaTrxConns ) == 1 ) { // single DB transaction (1PC)
+ $conn = reset( $xaTrxConns );
+ $this->connCommitXAInternal( $conn, '1PC' ); // also unregisters
+ wfDebug( "Committed XA transaction with ID {$this->trxIdXA}.\n" );
+ } elseif ( count( $xaTrxConns ) >= 1 ) { // distributed DB transaction (2PC)
+ $clusters = array_map( function( $conn ) { return $conn->xa_cluster; }, $xaTrxConns );
+ $this->trxJournal->onPrePrepare( $this->trxIdXA, $clusters );
+ $connsPrepared = array(); // DBs servers prepared in the XA transaction
+ try { // try to prepare on all DB servers...
+ foreach ( $xaTrxConns as $conn ) {
+ $this->connPrepareXAInternal( $conn );
+ $connsPrepared[] = $conn;
+ }
+ } catch ( DBError $e ) { // rollback all DB servers on failure...
+ foreach ( $connsPrepared as $conn ) {
+ try { // rollback as much as we can now
+ $this->connRollbackXAInternal( $conn ); // also unregisters
+ } catch ( DBError $e2 ) {};
+ }
+ throw $e; // throw original exception back
+ }
+ $this->trxJournal->onPreCommit();
+ foreach ( $connsPrepared as $conn ) {
+ $this->connCommitXAInternal( $conn, '2PC' ); // also unregisters
+ }
+ $this->trxJournal->onPostCommit();
+ wfDebug( "Committed distributed XA transaction with ID {$this->trxIdXA}.\n" );
+ } else {
+ wfDebug( "Committed transaction.\n" );
+ }
+ $this->trxIdXA = null; // done
+ }
+
+ /**
+ * Get an object representing a shard of a virtual DB table.
+ * Each table is sharded on at least one column key, and possibly
+ * denormalized and sharded on muliple column keys (e.g. rev ID, page ID, user ID).
+ *
+ * @see RDBStore::doGetPartition()
+ * @return ExternalRDBStoreTablePartition
+ */
+ protected function doGetPartition( $table, $column, $value, $wiki ) {
+ // Map this row to a consistent table shard, which only depends on $value.
+ // This mapping MUST always remain consistent (immutable)!
+ $hash = substr( sha1( $value ), 0, 4 ); // 65535 possible values
+ $index = (int)base_convert( $hash, 16, 10 ) % self::SHARD_COUNT; // [0,1023]
+ return new ExternalRDBStoreTablePartition( $this, $table, $index, $column, $value, $wiki );
+ }
+
+ /**
+ * @see RDBStore::doGetAllPartitions()
+ * @return Array List of ExternalRDBStoreTablePartition objects
+ */
+ protected function doGetAllPartitions( $table, $column, $wiki ) {
+ $partitions = array();
+ for ( $index = 0; $index < self::SHARD_COUNT; $index++ ) {
+ $partitions[] = new ExternalRDBStoreTablePartition(
+ $this, $table, $index, $column, null, $wiki
+ );
+ }
+ return $partitions;
+ }
+
+ /**
+ * Outside callers should generally not need this and should avoid using it
+ *
+ * @return Array List of cluster names for this store.
+ */
+ public function getClusters() {
+ return $this->clusters;
+ }
+
+ /**
+ * Get a map of DB cluster names to shard indexes they serve.
+ * Outside callers should generally not need this and should avoid using it.
+ *
+ * @return Array
+ */
+ public function getClusterMapping() {
+ $map = array();
+ for ( $index = 0; $index < self::SHARD_COUNT; $index++ ) {
+ $map[$this->getClusterForIndex( $index )][] = $index;
+ }
+ return $map;
+ }
+
+ /**
+ * Outside callers should generally not need this and should avoid using it.
+ *
+ * @return bool Two-Phase-Commit is enabled
+ */
+ public function is2PCEnabled() {
+ return $this->enable2PC;
+ }
+
+ /**
+ * Format a shard number by padding out the digits as needed.
+ * Outside callers should generally not need this and should avoid using it.
+ *
+ * @param $index integer
+ * @return string
+ */
+ public function formatShardIndex( $index ) {
+ $decimals = strlen( self::SHARD_COUNT - 1 );
+ return sprintf( "%0{$decimals}d", $index ); // e.g "033"
+ }
+
+ /**
+ * Outside callers should generally not need this and should avoid using it
+ *
+ * @param $index integer Partition index
+ * @return string Name of DB cluster reponsible for this shard index
+ * @throws MWException
+ */
+ public function getClusterForIndex( $index ) {
+ if ( $index < 0 || $index >= self::SHARD_COUNT ) {
+ throw new MWException( "Index $index is not a valid partition index." );
+ }
+ // This mapping MUST always remain consistent (immutable)!
+ return $this->clusterPrefix . ( ( $index % $this->clusterCount ) + 1 );
+ }
+
+ /**
+ * Outside callers should generally not need this and should avoid using it
+ *
+ * @param $index integer Partition index
+ * @return LoadBalancer For the cluster the partition index is on
+ */
+ public function getClusterLBForIndex( $index ) {
+ return wfGetLBFactory()->getExternalLB( $this->getClusterForIndex( $index ) );
+ }
+
+ /**
+ * Return all table partitions that exist for a virtual table.
+ * This can be used to verify schema change completion for a table.
+ *
+ * @see DatabaseBase::tableExists()
+ * @param $table string Virtual table name
+ * @param $column string Shard key column name
+ * @param $wiki string Wiki ID (default to current wiki)
+ * @return Array List of partition table names
+ */
+ public function partitionsWhereTableExists( $table, $column, $wiki = false ) {
+ $ctpartitions = array();
+ $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ foreach ( $this->doGetAllPartitions( $table, $column, $wiki ) as $tp ) {
+ if ( $tp->getMasterDB()->tableExists( $tp->getPartitionTable() ) ) {
+ $ctpartitions[] = $tp->getPartitionTable();
+ }
+ }
+ return $ctpartitions;
+ }
+
+ /**
+ * Return all table partitions that exist for a virtual table.
+ * This can be used to verify schema change completion for a table.
+ *
+ * @see DatabaseBase::tableExists()
+ * @param $table string Virtual table name
+ * @param $column string Shard key column name
+ * @param $wiki string Wiki ID (default to current wiki)
+ * @return Array List of partition table names
+ */
+ public function partitionsWhereTableNotExists( $table, $column, $wiki = false ) {
+ return array_diff(
+ $this->getPartitionTableNames( $table, $column ),
+ $this->partitionsWhereTableExists( $table, $column, $wiki )
+ );
+ }
+
+ /**
+ * Return all table partitions that have a specified column.
+ * This can be used to verify schema change completion for a table.
+ *
+ * @see DatabaseBase::fieldExists()
+ * @param $table string Virtual table name
+ * @param $column string Shard key column name
+ * @param $field string Column name to check
+ * @param $wiki string Wiki ID (default to current wiki)
+ * @return Array List of partition table names
+ */
+ public function partitionsWhereFieldExists( $table, $column, $field, $wiki = false ) {
+ $ctpartitions = array();
+ $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ foreach ( $this->doGetAllPartitions( $table, $column, $wiki ) as $tp ) {
+ if ( $tp->getMasterDB()->fieldExists( $tp->getPartitionTable(), $field ) ) {
+ $ctpartitions[] = $tp->getPartitionTable();
+ }
+ }
+ return $ctpartitions;
+ }
+
+ /**
+ * Return all table partitions that do not have a specified column.
+ * This can be used to verify schema change completion for a table.
+ *
+ * @see DatabaseBase::fieldExists()
+ * @param $table string Virtual table name
+ * @param $column string Shard key column name
+ * @param $field string Column name to check
+ * @param $wiki string Wiki ID (default to current wiki)
+ * @return Array List of partition table names
+ */
+ public function partitionsWhereFieldNotExists( $table, $column, $field, $wiki = false ) {
+ return array_diff(
+ $this->getPartitionTableNames( $table, $column ),
+ $this->partitionsWhereFieldExists( $table, $column, $field, $wiki )
+ );
+ }
+
+ /**
+ * Return all table partitions that have a specified index.
+ * This can be used to verify schema change completion for a table.
+ *
+ * @see DatabaseBase::indexExists()
+ * @param $table string Virtual table name
+ * @param $column string Shard key column name
+ * @param $index string Index name to check
+ * @param $wiki string Wiki ID (default to current wiki)
+ * @return Array List of partition table names
+ */
+ public function partitionsWhereIndexExists( $table, $column, $index, $wiki = false ) {
+ $ctpartitions = array();
+ $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ foreach ( $this->doGetAllPartitions( $table, $column, $wiki ) as $tp ) {
+ if ( $tp->getMasterDB()->indexExists( $tp->getPartitionTable(), $index ) ) {
+ $ctpartitions[] = $tp->getPartitionTable();
+ }
+ }
+ return $ctpartitions;
+ }
+
+ /**
+ * Return all table partitions that do not have a specified index.
+ * This can be used to verify schema change completion for a table.
+ *
+ * @see DatabaseBase::indexExists()
+ * @param $table string Virtual table name
+ * @param $column string Shard key column name
+ * @param $index string Index name to check
+ * @param $wiki string Wiki ID (default to current wiki)
+ * @return Array List of partition table names
+ */
+ public function partitionsWhereIndexNotExists( $table, $column, $index, $wiki = false ) {
+ return array_diff(
+ $this->getPartitionTableNames( $table, $column ),
+ $this->partitionsWhereIndexExists( $table, $column, $index, $wiki )
+ );
+ }
+
+ /**
+ * Get a list of the partition tables for a given virtual DB table and shard column.
+ * Outside callers should generally not need this and should avoid using it.
+ *
+ * @param $table string Virtual DB table
+ * @param $column string Column the table is sharded on
+ * @return Array List of partition table names
+ */
+ protected function getPartitionTableNames( $table, $column ) {
+ if ( !is_string( $table ) || !is_string( $column ) ) {
+ throw new MWException( "Missing table or column name." );
+ }
+ $pTables = array();
+ for ( $index = 0; $index < self::SHARD_COUNT; $index++ ) {
+ $shard = $this->formatShardIndex( $index ); // e.g "0033"
+ $pTables[] = "{$table}__{$shard}__{$column}";
+ }
+ return $pTables;
+ }
+
+ /**
+ * Do not call this function from places outside ExternalRDBStore
+ *
+ * @return string|null Format is "<14 digits>-<32 hex chars>"
+ */
+ public function getTrxIdXAInternal() {
+ return $this->trxIdXA;
+ }
+
+ /**
+ * Do not call this function from places outside ExternalRDBStore
+ *
+ * @param $conn DatabaseBase
+ * @return void
+ */
+ public function registerConnTrxInternal( DatabaseBase $conn ) {
+ $this->connsWithTrx[] = $conn;
+ }
+
+ /**
+ * Do not call this function from places outside ExternalRDBStore
+ *
+ * @param $conn DatabaseBase
+ * @return void
+ */
+ public function unregisterConnTrxInternal( DatabaseBase $conn ) {
+ unset( $this->connsWithTrx[array_search( $conn, $this->connsWithTrx )] );
+ }
+
+ /**
+ * Do not call this function from places outside ExternalRDBStore
+ *
+ * @param $conn DatabaseBase
+ * @param $cluster string Cluster name for $conn
+ * @return void
+ * @throws DBUnexpectedError
+ */
+ public function connStartXAInternal( DatabaseBase $conn, $cluster ) {
+ if ( !strlen( $cluster ) ) {
+ throw new DBUnexpectedError( $conn, "No DB cluster name provided." );
+ }
+ if ( !isset( $conn->xa_trx_id ) ) { // not in XA transaction already
+ $conn->clearFlag( DBO_TRX ); // use XA statements instead of BEGIN
+ if ( $conn->getType() === 'mysql' ) {
+ $conn->query( "XA START {$conn->addQuotes($this->trxIdXA)}", __METHOD__ );
+ } elseif ( $conn->getType() === 'postgres' ) {
+ $conn->query( "BEGIN", __METHOD__ );
+ } else {
+ throw new DBUnexpectedError( $conn, "DB does not support 2PC." );
+ }
+ $conn->xa_cluster = $cluster;
+ $conn->xa_trx_id = $this->trxIdXA;
+ $this->registerConnTrxInternal( $conn );
+ }
+ }
+
+ /**
+ * Do not call this function from places outside ExternalRDBStore
+ *
+ * @param $conn DatabaseBase
+ * @return void
+ * @throws DBUnexpectedError
+ */
+ public function connPrepareXAInternal( DatabaseBase $conn ) {
+ if ( $conn->getType() === 'mysql' ) {
+ $conn->query( "XA END {$conn->addQuotes($this->trxIdXA)}", __METHOD__ );
+ $conn->query( "XA PREPARE {$conn->addQuotes($this->trxIdXA)}", __METHOD__ );
+ } elseif ( $conn->getType() === 'postgres' ) {
+ $conn->query( "PREPARE TRANSACTION {$conn->addQuotes($this->trxIdXA)}", __METHOD__ );
+ } else {
+ throw new DBUnexpectedError( $conn, "DB does not support 2PC." );
+ }
+ }
+
+ /**
+ * Do not call this function from places outside ExternalRDBStore
+ *
+ * @param $conn DatabaseBase
+ * @param $mode string Either '1PC' or '2PC'
+ * @param $id string Transaction ID (defaults to current)
+ * @return void
+ * @throws DBUnexpectedError
+ */
+ public function connCommitXAInternal( DatabaseBase $conn, $mode = '2PC', $id = null ) {
+ $id = $id ? $id : $this->trxIdXA;
+ if ( $conn->getType() === 'mysql' ) {
+ if ( $mode === '1PC' ) {
+ $conn->query( "XA END {$conn->addQuotes($this->trxIdXA)}", __METHOD__ );
+ $conn->query( "XA COMMIT {$conn->addQuotes($id)} ONE PHASE", __METHOD__ );
+ } else { // 2PC
+ $conn->query( "XA COMMIT {$conn->addQuotes($id)}", __METHOD__ );
+ }
+ } elseif ( $conn->getType() === 'postgres' ) {
+ if ( $mode === '1PC' ) {
+ $conn->query( "COMMIT" );
+ } else { // 2PC
+ $conn->query( "COMMIT PREPARED {$conn->addQuotes($id)}", __METHOD__ );
+ }
+ } else {
+ throw new DBUnexpectedError( $conn, "DB does not support 2PC." );
+ }
+ unset( $conn->xa_trx_id ); // done
+ if ( $id === $this->trxIdXA ) { // from begin()/finish()?
+ $this->unregisterConnTrxInternal( $conn );
+ }
+ }
+
+ /**
+ * Do not call this function from places outside ExternalRDBStore
+ *
+ * @param $conn DatabaseBase
+ * @param $id string Transaction ID (defaults to current)
+ * @return void
+ * @throws DBUnexpectedError
+ */
+ public function connRollbackXAInternal( DatabaseBase $conn, $id = null ) {
+ $id = $id ? $id : $this->trxIdXA;
+ if ( $conn->getType() === 'mysql' ) {
+ $conn->query( "XA ROLLBACK {$conn->addQuotes($id)}", __METHOD__ );
+ } elseif ( $conn->getType() === 'postgres' ) {
+ $conn->query( "ROLLBACK PREPARED {$conn->addQuotes($id)}", __METHOD__ );
+ } else {
+ throw new DBUnexpectedError( $conn, "DB does not support 2PC." );
+ }
+ unset( $conn->xa_trx_id ); // done
+ if ( $id === $this->trxIdXA ) { // from begin()/finish()?
+ $this->unregisterConnTrxInternal( $conn );
+ }
+ }
+
+ /**
+ * Do not call this function from places outside ExternalRDBStore
+ *
+ * @param $conn DatabaseBase
+ * @param $id string Transaction ID
+ * @return bool
+ * @throws DBUnexpectedError
+ */
+ public function connExistsXAInternal( DatabaseBase $conn, $id ) {
+ return in_array( $id, $this->connGetAllPreparedXAInternal( $conn ) );
+ }
+
+ /**
+ * Do not call this function from places outside ExternalRDBStore
+ *
+ * @param $conn DatabaseBase
+ * @return Array List of prepared transaction IDs
+ * @throws DBUnexpectedError
+ */
+ public function connGetAllPreparedXAInternal( DatabaseBase $conn ) {
+ $trxIds = array();
+ if ( $conn->getType() === 'mysql' ) {
+ foreach ( $conn->query( "XA RECOVER", __METHOD__ ) as $row ) {
+ $trxIds[] = $row->data;
+ }
+ } elseif ( $conn->getType() === 'postgres' ) {
+ foreach ( $conn->query( "SELECT * FROM pg_prepared_xacts", __METHOD__ ) as $row ) {
+ $trxIds[] = $row->transaction;
+ }
+ } else {
+ throw new DBUnexpectedError( $conn, "DB does not support 2PC." );
+ }
+ return $trxIds;
+ }
+}
diff --git a/includes/rdbstore/ExternalRDBStoreTablePartition.php b/includes/rdbstore/ExternalRDBStoreTablePartition.php
new file mode 100644
index 000000000000..ca217c0d2f9a
--- /dev/null
+++ b/includes/rdbstore/ExternalRDBStoreTablePartition.php
@@ -0,0 +1,89 @@
+<?php
+/**
+ * This file deals with sharded RDBMs stores.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup RDBStore
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class representing a single partition of a virtual DB table
+ *
+ * @ingroup RDBStore
+ * @since 1.20
+ */
+class ExternalRDBStoreTablePartition extends RDBStoreTablePartition {
+ /** @var LoadBalancer */
+ protected $lb;
+
+ /**
+ * @param $rdbStore ExternalRDBStore
+ * @param $table string Virtual table name
+ * @param $index integer Partition index
+ * @param $key string Shard key column name
+ * @param $value array Shard key column value
+ * @param $wiki string Wiki ID
+ */
+ public function __construct(
+ ExternalRDBStore $rdbStore, $table, $index, $key, $value, $wiki
+ ) {
+ $this->rdbStore = $rdbStore;
+ $this->lb = $rdbStore->getClusterLBForIndex( $index );
+ $this->vTable = (string)$table;
+ $this->pTable = "{$table}__{$rdbStore->formatShardIndex( $index )}__{$key}";
+ $this->key = (string)$key;
+ $this->value = (string)$value;
+ $this->wiki = (string)$wiki;
+ }
+
+ /**
+ * @see RDBStoreTablePartition
+ * @return DatabaseBase
+ */
+ public function getSlaveDB() {
+ $conn = $this->lb->getConnection( DB_SLAVE, array(), $this->wiki );
+ if ( $this->rdbStore->hasTransaction() ) {
+ $conn->setFlag( DBO_TRX ); // wrap queries in one transaction
+ $this->rdbStore->registerConnTrxInternal( $conn );
+ } else {
+ $conn->clearFlag( DBO_TRX ); // auto-commit by default
+ }
+ return $conn;
+ }
+
+ /**
+ * @see RDBStoreTablePartition
+ * @return DatabaseBase
+ */
+ public function getMasterDB() {
+ $conn = $this->lb->getConnection( DB_MASTER, array(), $this->wiki );
+ if ( $this->rdbStore->hasTransaction() ) {
+ if ( $this->rdbStore->getTrxIdXAInternal() ) { // XA transaction
+ $info = $this->lb->parentInfo(); // DB cluster info
+ $this->rdbStore->connStartXAInternal( $conn, $info['id'] );
+ } else { // non-XA transaction
+ $conn->setFlag( DBO_TRX ); // wrap queries in one transaction
+ $this->rdbStore->registerConnTrxInternal( $conn );
+ }
+ } else {
+ $conn->clearFlag( DBO_TRX ); // auto-commit by default
+ }
+ return $conn;
+ }
+}
diff --git a/includes/rdbstore/ExternalRDBStoreTrxJournal.php b/includes/rdbstore/ExternalRDBStoreTrxJournal.php
new file mode 100644
index 000000000000..6d84c3698228
--- /dev/null
+++ b/includes/rdbstore/ExternalRDBStoreTrxJournal.php
@@ -0,0 +1,278 @@
+<?php
+/**
+ * This file deals with sharded RDBMs stores.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup RDBStore
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class that helps with cross-shard commits and anomaly correction in external RDB stores.
+ * This logs the transaction ID and affected servers before the PREPARE/COMMIT phases.
+ * Distributed transactions that use this class should essentially work as follows:
+ * - a) The coordinator prepare transaction journal is updated
+ * - b) Changes are proposed on the relevant DBs, starting XA transactions
+ * - c) The coordinator commit transaction journal is updated
+ * - d) Changes are prepared and committed on the relevant DBs, ending XA transactions
+ * These log files are deleted when everything is committed successfully. Old log files are
+ * assumed to correspond to failed transactions and are used to repair the affected DB rows.
+ *
+ * @ingroup RDBStore
+ * @since 1.20
+ */
+class ExternalRDBStoreTrxJournal {
+ /** @var ExternalRDBStore */
+ protected $rdbStore;
+
+ /** @var Array */
+ protected $directory; // string; file system path
+ protected $trxId; // string; timestamped UUID
+ protected $phase = self::PHASE_READY; // integer
+
+ const PHASE_READY = 1; // not in a transaction
+ const PHASE_PREPARING = 2; // ready for PREPAREs
+ const PHASE_COMMITTING = 3; // agreement on all changes, ready for COMMITs
+
+ const STALE_SEC = 900; // how old a .tlog file must be to be "stale"
+ const CHECK_SEC = 60; // period for checking for stale .tlog files
+
+ /**
+ * @param $rdbStore ExternalRDBStore
+ */
+ public function __construct( ExternalRDBStore $rdbStore ) {
+ $this->rdbStore = $rdbStore;
+ $this->directory = wfTempDir() . '/mw-rdbstore/' . rawurlencode( $rdbStore->getName() );
+ }
+
+ /**
+ * Function to be called right before transaction PREPARE
+ *
+ * @param $trxId string Unique global transaction ID
+ * @param $clusters array List of DB cluster names for each server
+ * @return void
+ */
+ public function onPrePrepare( $trxId, array $clusters ) {
+ if ( $this->phase !== self::PHASE_READY ) {
+ throw new MWException( "Transaction alreay in progress." );
+ }
+ $this->trxId = $trxId;
+ // Log the PREPARE attempt to a file in /tmp space
+ $data = serialize( array( 'clusters' => $clusters ) );
+ wfProfileIn( __METHOD__ . '-log' );
+ wfMkdirParents( $this->directory ); // create dir if needed
+ $bytes = file_put_contents( "{$this->directory}/{$this->trxId}.prepare.tlog", $data );
+ wfProfileOut( __METHOD__ . '-log' );
+ if ( $bytes !== strlen( $data ) ) {
+ throw new MWException( "Could not write to '{$this->trxId}.prepare.tlog' file." );
+ }
+ wfDebugLog( "RDBStoreTrx", "{$this->rdbStore->getName()}\tPREPARE\t{$this->trxId}\n" );
+ $this->phase = self::PHASE_PREPARING;
+ }
+
+ /**
+ * Function to be called right before transaction COMMIT
+ *
+ * @return void
+ */
+ public function onPreCommit() {
+ if ( $this->phase !== self::PHASE_PREPARING ) {
+ throw new MWException( "Transaction not prepared." );
+ }
+ // Log the COMMIT decision to a file in /tmp space
+ wfProfileIn( __METHOD__ . '-log' );
+ $ok = $this->fsyncCopy(
+ "{$this->directory}/{$this->trxId}.prepare.tlog",
+ "{$this->directory}/{$this->trxId}.commit.tlog"
+ );
+ wfProfileOut( __METHOD__ . '-log' );
+ if ( !$ok ) {
+ throw new MWException( "Could not create '{$this->trxId}.commit.tlog' file." );
+ }
+ wfDebugLog( "RDBStoreTrx", "{$this->rdbStore->getName()} COMMIT {$this->trxId}\n" );
+ $this->phase = self::PHASE_COMMITTING;
+ }
+
+ /**
+ * Function to be called right after transaction COMMIT
+ *
+ * @return void
+ */
+ public function onPostCommit() {
+ if ( $this->phase !== self::PHASE_COMMITTING ) {
+ throw new MWException( "Transaction is not ready to commit." );
+ }
+ wfProfileIn( __METHOD__ . '-log' );
+ unlink( "{$this->directory}/{$this->trxId}.prepare.tlog" ); // not needed anymore
+ unlink( "{$this->directory}/{$this->trxId}.commit.tlog" ); // not needed anymore
+ wfProfileOut( __METHOD__ . '-log' );
+ $this->trxId = null;
+ $this->phase = self::PHASE_READY;
+ }
+
+ /**
+ * @param $src string
+ * @param $dst string
+ * @return bool
+ */
+ protected function fsyncCopy( $src, $dst ) {
+ if ( !wfIsWindows() ) { // *nix
+ $encSrc = wfEscapeShellArg( $src );
+ $encDst = wfEscapeShellArg( $dst );
+ $retVal = 1;
+ wfShellExec( "dd if={$encSrc} of={$encDst} conv=fsync", $retVal );
+ return ( $retVal == 0 );
+ } else { // windows
+ return copy( $src, $dst );
+ }
+ }
+
+ /**
+ * @return integer Number of .tlog files consumed
+ */
+ public function periodicLogCheck() {
+ wfProfileIn( __METHOD__ );
+ try {
+ $cache = ObjectCache::newAccelerator( array() ); // local server cache
+ } catch ( MWException $e ) {
+ $cache = new EmptyBagOStuff();
+ }
+ $key = "rdbstore-collect-{$this->rdbStore->getName()}";
+ $time = $cache->get( $key );
+ if ( !is_int( $time ) ) { // cache miss
+ wfSuppressWarnings();
+ $time = filemtime( "{$this->directory}/collect" ); // false if file doesn't exist
+ wfRestoreWarnings();
+ $cache->set( $key, (int)$time, 86400 );
+ }
+ if ( $time < ( time() - self::CHECK_SEC ) ) {
+ $cache->set( $key, time(), 86400 );
+ wfDebug( __METHOD__ . ": Checking for stale tlog files.\n" );
+ $count = $this->resolveLimboTransactions( 10 );
+ } else {
+ wfDebug( __METHOD__ . ": Skipping check for stale tlog files.\n" );
+ $count = 0;
+ }
+ wfProfileOut( __METHOD__ );
+ return $count;
+ }
+
+ /**
+ * Find stale .tlog files lying around in /tmp and finish the corresponding
+ * DB transactions. The .tlog files are removed once the transactions resolve.
+ *
+ * @param $maxTrxCount integer Maximum .tlog files to consume
+ * @return integer Number of .tlog files consumed
+ */
+ public function resolveLimboTransactions( $maxTrxCount = 0 ) {
+ wfProfileIn( __METHOD__ );
+
+ // Make sure only one process on this server is doing this...
+ wfMkdirParents( $this->directory ); // create dir if needed
+ $lockFileHandle = fopen( "{$this->directory}/collect", 'w' ); // open & touch
+ if ( !$lockFileHandle ) {
+ wfProfileOut( __METHOD__ );
+ return false; // bail out
+ } elseif ( !flock( $lockFileHandle, LOCK_EX | LOCK_NB ) ) {
+ fclose( $lockFileHandle );
+ wfProfileOut( __METHOD__ );
+ return false; // someone else probably beat us
+ }
+
+ $e = null; // exception
+ try {
+ $logs = array(); // (trx ID => {filename,step})
+ // Get all of the old .tlog files lying around...
+ $dirHandle = opendir( $this->directory );
+ if ( $dirHandle ) {
+ $cutoff = time() - self::STALE_SEC;
+ $fileRegex = '/^(\d{14}-[0-9a-f]{32})\.(commit|prepare)\.tlog$/';
+ while ( $file = readdir( $dirHandle ) ) {
+ $m = array(); // matches
+ if ( !preg_match( $fileRegex, $file, $m ) ) {
+ continue; // sanity
+ } elseif ( filemtime( "{$this->directory}/{$file}" ) > $cutoff ) {
+ continue; // wait; transaction may still be in progress
+ }
+ list( /*all*/, $trxId, $step ) = $m;
+ if ( $step === 'commit' || !isset( $logs[$trxId] ) ) { // commit has precedence
+ $info = unserialize( file_get_contents( "{$this->directory}/{$file}" ) );
+ if ( is_array( $info ) ) {
+ $logs[$trxId] = $info + array( 'step' => $step );
+ }
+ }
+ }
+ closedir( $dirHandle );
+ }
+ wfDebugLog( 'RDBStore', "Found " . count( $logs ) . " limbo transaction(s).\n" );
+ // Run through transactions that failed and resolve them...
+ $countDone = 0; // count fixed
+ foreach ( $logs as $trxId => $trxInfo ) {
+ if ( $this->finishTransaction( $trxId, $trxInfo ) ) {
+ ++$countDone; // another transaction resolved
+ if ( file_exists( "{$this->directory}/{$trxId}.prepare.tlog" ) ) {
+ unlink( "{$this->directory}/{$trxId}.prepare.tlog" );
+ }
+ if ( file_exists( "{$this->directory}/{$trxId}.commit.tlog" ) ) {
+ unlink( "{$this->directory}/{$trxId}.commit.tlog" );
+ }
+ if ( $maxTrxCount && $countDone >= $maxTrxCount ) {
+ break; // only handle so many tlog files
+ }
+ }
+ }
+ wfDebugLog( 'RDBStore', "Repaired $countDone limbo transaction(s).\n" );
+ } catch ( Exception $e ) {}
+
+ // Release locks so other processes can poll these files...
+ flock( $lockFileHandle, LOCK_UN );
+ fclose( $lockFileHandle );
+
+ wfProfileOut( __METHOD__ );
+ if ( $e instanceof Exception ) {
+ throw $e; // throw back any exception
+ }
+
+ return count( $logs );
+ }
+
+ /**
+ * @param $trxId string
+ * @param $trxInfo array Map of trx ID, commit status, and clusters used
+ * @return boolean
+ */
+ protected function finishTransaction( $trxId, array $trxInfo ) {
+ foreach ( $trxInfo['clusters'] as $cluster ) { // for each cohort
+ $lb = wfGetLBFactory()->getExternalLB( $cluster ); // LB for cohort
+ $dbw = $lb->getConnection( DB_MASTER ); // DB master
+ $dbw->clearFlag( DBO_TRX ); // don't wrap in BEGIN
+ if ( $this->rdbStore->connExistsXAInternal( $dbw, $trxId ) ) { // already resolved?
+ if ( $trxInfo['step'] === 'commit' ) { // COMMIT decision made
+ $this->rdbStore->connCommitXAInternal( $dbw, '2PC', $trxId );
+ wfDebugLog( 'RDBStore', "Commited transaction '$trxId'.\n" );
+ } else { // no COMMIT decision made; ROLLBACK
+ $this->rdbStore->connRollbackXAInternal( $dbw, $trxId );
+ wfDebugLog( 'RDBStore', "Rolled back transaction '$trxId'.\n" );
+ }
+ } else {
+ wfDebugLog( 'RDBStore', "Could not find transaction '$trxId'.\n" );
+ }
+ }
+ return true;
+ }
+}
diff --git a/includes/rdbstore/LocalRDBStore.php b/includes/rdbstore/LocalRDBStore.php
new file mode 100644
index 000000000000..0e4ec6574e86
--- /dev/null
+++ b/includes/rdbstore/LocalRDBStore.php
@@ -0,0 +1,128 @@
+<?php
+/**
+ * This file deals with local RDBMs stores.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup RDBStore
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class representing a simple, non-external, DB storage system.
+ * Tables are not sharded, and only the wiki cluster is used.
+ *
+ * @ingroup RDBStore
+ * @since 1.20
+ */
+class LocalRDBStore extends RDBStore {
+ /** @var LoadBalancer */
+ protected $lb;
+
+ protected $wiki; // string
+
+ /**
+ * @param $options array
+ */
+ public function __construct( array $options ) {
+ $this->wiki = ( $options['wiki'] === false ) ? wfWikiID() : $options['wiki'];
+ // Use a separate connection so this handles transactions like external RDB stores
+ $this->lb = wfGetLBFactory()->newMainLB( $this->wiki );
+ }
+
+ /**
+ * @see RDBStore::getName()
+ * @return string
+ */
+ public function getName() {
+ return "local-{$this->wiki}";
+ }
+
+ /**
+ * @see RDBStore::isInternal()
+ * @return bool
+ */
+ public function isInternal() {
+ return true;
+ }
+
+ /**
+ * @see RDBStore::isPartitioned()
+ * @return bool
+ */
+ public function isPartitioned() {
+ return false;
+ }
+
+ /**
+ * @see RDBStore::beginOutermost()
+ * @see DatabaseBase::begin()
+ * @return void
+ */
+ protected function beginOutermost() {
+ $this->getMasterDB()->begin();
+ }
+
+ /**
+ * @see RDBStore::finishOutermost()
+ * @see DatabaseBase::commit()
+ * @return void
+ */
+ protected function finishOutermost() {
+ $this->getMasterDB()->commit();
+ }
+
+ /**
+ * @see RDBStore::doGetPartition()
+ * @return LocalRDBStoreTablePartition
+ */
+ protected function doGetPartition( $table, $column, $value, $wiki ) {
+ if ( $wiki !== $this->wiki ) {
+ throw new DBUnexpectedError( "Wiki ID '$wiki' does not match '{$this->wiki}'." );
+ }
+ return new LocalRDBStoreTablePartition( $this, $table, $column, $value, $wiki );
+ }
+
+ /**
+ * @see RDBStore::doGetAllPartitions()
+ * @return Array List of LocalRDBStoreTablePartition objects
+ */
+ protected function doGetAllPartitions( $table, $column, $wiki ) {
+ if ( $wiki !== $this->wiki ) {
+ throw new DBUnexpectedError( "Wiki ID '$wiki' does not match '{$this->wiki}'." );
+ }
+ return array( new LocalRDBStoreTablePartition( $this, $table, $column, null, $wiki ) );
+ }
+
+ /**
+ * Outside callers should generally not need this and should avoid using it
+ *
+ * @return DatabaseBase
+ */
+ public function getSlaveDB() {
+ return $this->lb->getConnection( DB_SLAVE, array(), $this->wiki );
+ }
+
+ /**
+ * Outside callers should generally not need this and should avoid using it
+ *
+ * @return DatabaseBase
+ */
+ public function getMasterDB() {
+ return $this->lb->getConnection( DB_MASTER, array(), $this->wiki );
+ }
+}
diff --git a/includes/rdbstore/LocalRDBStoreTablePartition.php b/includes/rdbstore/LocalRDBStoreTablePartition.php
new file mode 100644
index 000000000000..cad264001b40
--- /dev/null
+++ b/includes/rdbstore/LocalRDBStoreTablePartition.php
@@ -0,0 +1,64 @@
+<?php
+/**
+ * This file deals with local RDBMs stores.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup RDBStore
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class representing a single partition of a virtual DB table.
+ * This is just a regular table on the non-external main DB.
+ *
+ * @ingroup RDBStore
+ * @since 1.20
+ */
+class LocalRDBStoreTablePartition extends RDBStoreTablePartition {
+ /**
+ * @param $rdbStore LocalRDBStore
+ * @param $table string Table name
+ * @param $key string Shard key column name
+ * @param $value Array Shard key column value
+ * @param $wiki string Wiki ID
+ */
+ public function __construct( LocalRDBStore $rdbStore, $table, $key, $value, $wiki ) {
+ $this->rdbStore = $rdbStore;
+ $this->vTable = (string)$table;
+ $this->pTable = (string)$table;
+ $this->key = (string)$key;
+ $this->value = (string)$value;
+ $this->wiki = (string)$wiki;
+ }
+
+ /**
+ * @see RDBStoreTablePartition::getSlaveDB()
+ * @return DatabaseBase
+ */
+ public function getSlaveDB() {
+ return $this->rdbStore->getSlaveDB();
+ }
+
+ /**
+ * @see RDBStoreTablePartition::getMasterDB()
+ * @return DatabaseBase
+ */
+ public function getMasterDB() {
+ return $this->rdbStore->getMasterDB();
+ }
+}
diff --git a/includes/rdbstore/RDBStore.php b/includes/rdbstore/RDBStore.php
new file mode 100644
index 000000000000..aa7aaddddc26
--- /dev/null
+++ b/includes/rdbstore/RDBStore.php
@@ -0,0 +1,354 @@
+<?php
+/**
+ * @defgroup RDBStore RDBStore
+ *
+ * This file deals with shardable RDBMs stores.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup RDBStore
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class representing a relational DB storage system.
+ * Callers access tables as if they were horizontally partitioned,
+ * which may actually be the case for large, external, RDB stores.
+ *
+ * Partitioning for each table is based on a single non-NULL column.
+ * Only the column value will be used to determine partitions, not the column name.
+ *
+ * Read queries wrapped in begin() and finish() will use the default isolation level of the DB.
+ * All DB servers should be configured to use REPEATABLE-READ by default (like stock MySQL) so
+ * that separate read queries in a transaction will be consistent with a single snapshot in time.
+ * Queries performed on different DBs will see different point-in-time snapshots for each DB,
+ * since each will have its own local transaction.
+ *
+ * Note that cross-DB queries are not supported in this class. Tables should be sharded such
+ * that all queries only need to affect a single shard. If this is not possible, one can:
+ * - a) Have a canonical and a duplicate version of a table, each sharded in different ways.
+ * For example, the former being sharded on page ID and the later on user ID.
+ * Read queries for page X or user Y could thus be routed to a single shard.
+ * A drawback is that this essentially doubles the physical size of the data set.
+ * - b) Have a canonical and an index version of a table, each sharded in different ways.
+ * For example, the former being sharded on page ID and the later on user ID. The index
+ * table duplicates the main table, except only having columns used in WHERE clauses.
+ * Read queries for page X would hit a canonical table shard, while read queries for user
+ * Y would hit an index shard and then fetch the full rows from the main table shards.
+ * This takes up less space than (a) and can reduce the chance for cross-shard updates.
+ * A drawback is that user=Y queries require O(N) queries for result sets of size N.
+ * This should only be done if N is very small or if it is heavily cached (memcached).
+ * - c) Have a duplicate time-based rolling table stored in the local DB, if possible.
+ * For example, if a thread table is sharded on thread ID, one could also store the last
+ * few months of rows in a local DB table. Since the table is not sharded, anything that
+ * is properly indexed could be queried and JOINs could be done with local tables.
+ * Data inconsistencies are self-correcting as all rows are eventually rotated out.
+ * Cross-DB transactions may not be atomic, so options (a) and (b) can have permanent anomalies.
+ * Reads used for writes should always be done on the canonical table shards as good practice.
+ * For (a), if rows are only inserted and they have an indexed creation timestamp field, it's
+ * possible to synchronize denormalized data with a repair script. For (b), if rows are only
+ * inserted/updated, they have an indexed creation timestamp field, and all of the "index table"
+ * columns are immutable, then its possible to synchronize denormalized data with a repair script.
+ *
+ * Cross-DB transactions may not actually be atomic. When COMMITs are issued to each DB,
+ * it is possible for some COMMITs to succeed while others fail, though this should be rare.
+ * If this is problematic one might be able to do the following:
+ * - a) Use certian MVCC patterns. For example, if a local DB row needs to be created,
+ * referencing several RDB store blobs, one can commit all the blobs first, using
+ * UUIDs as foreign keys, updating the local DB only if all the blobs were committed.
+ * If the row and blobs need to be updated, new blobs with different UUIDs can be used.
+ * Failures just leave unreferenced garbage in the worst case scenario.
+ * - b) Partition tables such that related rows of different tables map to the same DB.
+ * This is useful if, for example, a table is partitioned on page ID but a table that
+ * stores aggregate data for by page ID is needed. The later can simply be partitioned
+ * by page ID, ensuring that rows for either table for any page X will be on the same DB.
+ * Sharded stores can be configured to use Two-Phase-Commit (2PC) for cross-DB transactions.
+ * PostgreSQL8 supports 2PC and MySQL 5 also has 2PC Open XA statements (useless in MySQL 5.X,
+ * planned to be fixed in MySQL 6.1). Limbo transactions are periodically resolved by the same
+ * application servers that initiated them. If they are unable to do so, then the transactions
+ * have to be resolved manually or by a repair script. Limbo transactions hold persistent locks.
+ * @see http://dev.mysql.com/doc/refman/5.0/en/xa-states.html
+ * @see http://www.postgresql.org/docs/9.1/static/sql-prepare-transaction.html
+ *
+ * When callers use begin() and finish(), the data might not be committed until later if another
+ * caller already called begin() and has yet to call finish(). Output to the user can be buffered
+ * in the normal means (e.g. OutputPage) in a way that blindly assumes it has been committed. If
+ * the commit fails, a DBError exception will be thrown and the buffered output will be tossed.
+ * Things like emails or job insertion, which need to happen only after successful commit, can be
+ * made to happen in a callback function. This callback can be passed to finish(), and will only
+ * triggered after successful commit.
+ *
+ * Example usage from an extension:
+ * @code
+ * $rdbs = RDBStoreGroup::singleton()->getForTable( 'ext-myextension' );
+ * $tp = $rdbs->getPartition( 'comments', 'thread', 501 );
+ * $res = $tp->select( DB_SLAVE, '*', array( 'thread' => 501, 'comment' => 11 ), __METHOD__ );
+ * @endcode
+ *
+ * @code
+ * $rdbs = RDBStoreGroup::singleton()->getForTable( 'ext-myextension' );
+ * $rdbs->begin();
+ * $rdbs->getPartition( 'user_info', 'user', $row['user'] )->insert( $row, __METHOD__ );
+ * $rdbs->finish();
+ * @endcode
+ *
+ * @code
+ * $rdbs = RDBStoreGroup::singleton()->getForTableGroup( 'ext-myextension' );
+ * $rdbs->begin();
+ * // This table is sharded by user and and duplicated to shard on event
+ * $places = $rdbs->columnValueMap( $row, array( 'user', 'event' ) );
+ * // Insert the subscription row into canonical and duplicate shards...
+ * foreach ( $rdbs->getEachPartition( 'subscriptions', $places ) as $tp ) {
+ * $tp->insert( $row, __METHOD__ );
+ * }
+ * $rdbs->finish();
+ * @endcode
+ *
+ * @code
+ * $rdbs = RDBStoreGroup::singleton()->getForTableGroup( 'ext-myextension' );
+ * $rdbs->begin();
+ * // Insert the comment row into the canonical shard...
+ * $rdbs->getPartition( 'comments', 'thread', $row['thread'] )->insert( $row, __METHOD__ );
+ * // Insert a row that references the comment row into the user index shard...
+ * $irow = $rdbs->columnValueMap( $row, array( 'user', 'thread', 'comment' ) );
+ * $rdbs->getPartition( 'comments', 'user', $row['user'] )->insert( $irow, __METHOD__ );
+ * // Update the stats for the thread...
+ * $tp = $rdbs->getPartition( 'thread_stats', 'thread', $row['thread'] );
+ * $tp->update( 'thread_stats', array(...), array( 'thread' => $row['thread'] ), __METHOD__ );
+ * $rdbs->finish( function() {
+ * MyExtension::enqueueNotifyJobs( $row );
+ * } );
+ * @endcode
+ *
+ * @ingroup RDBStore
+ * @since 1.20
+ */
+abstract class RDBStore {
+ protected $trxDepth = 0; // integer
+ protected $trxStartTime; // float; UNIX timestamp
+ protected $postCommitCallbacks = array(); // list of post-commit callbacks
+
+ /**
+ * @param $options array
+ */
+ public function __construct( array $options ) {}
+
+ /**
+ * @return string Name of this store
+ */
+ abstract public function getName();
+
+ /**
+ * @return bool Whether this store is the primary DB of a wiki
+ */
+ abstract public function isInternal();
+
+ /**
+ * @return bool Tables are actually partitioned in the store
+ */
+ abstract public function isPartitioned();
+
+ /**
+ * Check if the store is currently in a DB transaction.
+ * Outside callers should generally not need this and should avoid using it.
+ *
+ * @return bool
+ */
+ final public function hasTransaction() {
+ return ( $this->trxDepth > 0 );
+ }
+
+ /**
+ * Increment the transaction counter and begin a transaction if the counter was zero
+ *
+ * @return bool DB transaction actually started
+ * @throws DBError
+ */
+ final public function begin() {
+ if ( $this->trxDepth == 0 ) {
+ $this->beginOutermost(); // BEGIN for the outermost transaction
+ $this->trxStartTime = microtime( true );
+ }
+ ++$this->trxDepth; // increment the transaction counter in any case
+ wfDebug( __METHOD__ . ": transaction nesting level now {$this->trxDepth}.\n" );
+ return ( $this->trxDepth == 1 );
+ }
+
+ /**
+ * @see RDBStore::begin()
+ * @return void
+ */
+ abstract protected function beginOutermost();
+
+ /**
+ * Decrement the transaction counter and commit the transaction if the counter is zero.
+ *
+ * Callers can register post-commit callback functions (including closures) here.
+ * When the transaction is committed, any post-commit callback functions are called.
+ *
+ * @param $callback Closure Optional function to call after database COMMIT
+ * @return bool DB transaction actually committed
+ * @throws DBError
+ */
+ final public function finish( Closure $callback = null ) {
+ if ( $this->trxDepth <= 0 ) {
+ throw new MWException( "Detected unmatched finish() call." );
+ }
+ if ( $callback !== null ) {
+ // Register post-commit callback given by the caller
+ $this->postCommitCallbacks[] = $callback;
+ }
+ if ( $this->trxDepth == 1 ) {
+ $this->finishOutermost(); // COMMIT for the outermost transaction
+ }
+ --$this->trxDepth; // decrement the transaction counter in any case
+ wfDebug( __METHOD__ . ": transaction nesting level now {$this->trxDepth}.\n" );
+ // Once the transaction level is 0, anything must have been committed.
+ // Trigger any post-commit callbacks (which assume the data is committed).
+ if ( $this->trxDepth == 0 ) {
+ $e = null; // any exception
+ $callbacks = $this->postCommitCallbacks;
+ $this->postCommitCallbacks = array(); // recursion guard
+ foreach ( $callbacks as $callback ) {
+ try {
+ $callback();
+ } catch ( Exception $e ) {}
+ }
+ if ( $e instanceof Exception ) {
+ throw $e; // throw back the last exception
+ }
+ }
+ return ( $this->trxDepth == 0 );
+ }
+
+ /**
+ * @see RDBStore::finish()
+ * @return void
+ */
+ abstract protected function finishOutermost();
+
+ /**
+ * Get an object representing a shard of a virtual DB table.
+ * If this store is not partitioned, this returns an object for the entire table.
+ *
+ * Each table is canonically sharded on one column key, and may possibly be
+ * denormalized and sharded on additional column keys (e.g. thread ID, user ID).
+ *
+ * @param $table string Virtual table name
+ * @param $column string Shard key column name
+ * @param $value string Shard key column value
+ * @param $wiki string Wiki ID; defaults to the current wiki
+ * @return RDBStoreTablePartition
+ * @throws MWException
+ */
+ final public function getPartition( $table, $column, $value, $wiki = false ) {
+ $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ if ( !isset( $column ) || !isset( $value ) ) {
+ throw new MWException( "Missing table shard column or value." );
+ }
+ return $this->doGetPartition( $table, $column, $value, $wiki );
+ }
+
+ /**
+ * @see RDBStore::getPartition()
+ * @return RDBStoreTablePartition
+ */
+ abstract protected function doGetPartition( $table, $column, $value, $wiki );
+
+ /**
+ * Get a list of objects representing all shards of a virtual DB table.
+ * If this store is not partitioned, the list has one object for the entire table.
+ *
+ * Each table is canonically sharded on one column key, and may possibly be
+ * denormalized and sharded on additional column keys (e.g. thread ID, user ID).
+ *
+ * @param $table string Virtual table name
+ * @param $column string Shard key column name
+ * @param $wiki string Wiki ID; defaults to the current wiki
+ * @return Array List of RDBStoreTablePartition objects
+ * @throws MWException
+ */
+ final public function getAllPartitions( $table, $column, $wiki = false ) {
+ $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ if ( !isset( $column ) ) {
+ throw new MWException( "Missing table shard column." );
+ }
+ return $this->doGetAllPartitions( $table, $column, $wiki );
+ }
+
+ /**
+ * @see RDBStore::getAllPartitions()
+ * @return Array List of RDBStoreTablePartition objects
+ */
+ abstract protected function doGetAllPartitions( $table, $column, $wiki );
+
+ /**
+ * Get the column to value map for a row for a given list of columns.
+ * All these columns must be present in the row or an error will be thrown.
+ *
+ * @param $row Row|array
+ * @param $columns array
+ * @return Array
+ * @throws MWException
+ */
+ final public function columnValueMap( $row, array $columns ) {
+ $map = array_intersect_key( (array)$row, $columns );
+ $diff = array_diff( $columns, array_keys( $map ) );
+ if ( count( $diff ) ) {
+ throw new MWException( "Row is missing column(s) " . implode( ', ', $diff ) );
+ }
+ return $map;
+ }
+
+ /**
+ * Get a list of objects representing shards of a virtual DB table.
+ * A shard is returned for each (shard column, column value) tuple given if the
+ * store is partitioned, and for the first tuple if the store is not partitioned.
+ *
+ * This is for tables that are canonically sharded on one column key, and also
+ * denormalized and sharded on additional column keys (e.g. thread ID, user ID).
+ *
+ * When writing to multiple shards, callers should always do so in a certain order.
+ * For example, if a table is sharded on thread ID and denormalized to shard on user ID,
+ * updates should first write to the thread shard and then to the user shard. This reduces
+ * the chance for deadlocks arising from transaction locks on rows. If P1 locks rows on
+ * thread partition A, and blocks on P2 for rows of user partition B, it can't be the case
+ * that P2 is also blocked on P1 for rows of thread partition A (due to the write order).
+ *
+ * @param $table string Virtual table name
+ * @param $map array Map of shard column names to values from RDBStore::columnValueMap()
+ * @param $wiki string Wiki ID; defaults to the current wiki
+ * @return Array Map of column names to RDBStoreTablePartition objects (same order as $map)
+ * @throws MWException
+ */
+ final public function getEachPartition( $table, array $map, $wiki = false ) {
+ $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ $partitions = array();
+ foreach ( $map as $column => $value ) {
+ if ( !is_string( $column ) ) {
+ throw new MWException( "Invalid column name." );
+ } elseif ( !is_scalar( $value ) ) {
+ throw new MWException( "Invalid value for column '$column'." );
+ }
+ $partitions[$column] = $this->getPartition( $table, $column, $value, $wiki );
+ if ( !$this->isPartitioned() ) {
+ break; // the data is all one one table
+ }
+ }
+ return $partitions;
+ }
+}
diff --git a/includes/rdbstore/RDBStoreGroup.php b/includes/rdbstore/RDBStoreGroup.php
new file mode 100644
index 000000000000..150f7b1695c2
--- /dev/null
+++ b/includes/rdbstore/RDBStoreGroup.php
@@ -0,0 +1,183 @@
+<?php
+/**
+ * This file deals with sharded database stores.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup RDBStore
+ * @author Aaron Schulz
+ */
+
+/**
+ * Factory class for getting RDBStore objects
+ *
+ * @ingroup RDBStore
+ * @since 1.20
+ */
+class RDBStoreGroup {
+ /** @var RDBStoreGroup */
+ protected static $instance = null;
+
+ /** @var Array */
+ protected $storeConfig = array(); // (store name => config array)
+ /** @var Array */
+ protected $tableGroups = array(); // (group name => table list)
+ /** @var Array */
+ protected $tableSyncInfo = array(); // (table name => config array)
+
+ /** @var Array */
+ protected $storedTables = array(); // (wiki ID => table name => store name)
+
+ /** @var Array */
+ protected $extStoreInstances = array(); // (store name => ExternalRDBStore)
+ /** @var Array */
+ protected $intStoreInstances = array(); // (wiki ID => LocalRDBStore)
+
+ protected function __construct() {}
+
+ /**
+ * @return RDBStoreGroup
+ */
+ public static function singleton() {
+ if ( self::$instance == null ) {
+ self::$instance = new self();
+ self::$instance->initFromGlobals();
+ }
+ return self::$instance;
+ }
+
+ /**
+ * Destroy the singleton instance
+ *
+ * @return void
+ */
+ public static function destroySingleton() {
+ self::$instance = null;
+ }
+
+ /**
+ * Register db stores from the global variables
+ *
+ * @return void
+ */
+ protected function initFromGlobals() {
+ global $wgRDBStores, $wgRDBStoredTables;
+
+ $this->storeConfig = $wgRDBStores;
+ $this->storedTables = $wgRDBStoredTables;
+ // Let extensions register table groups and denormalized tables
+ wfRunHooks( 'RDBStoreTableSchemaInfo',
+ array( &$this->tableGroups, &$this->tableSyncInfo ) );
+ }
+
+ /**
+ * Get a DB store on the main cluster of a wiki.
+ * This uses a multi-singleton pattern to track transactions.
+ *
+ * @param $wiki string Wiki ID
+ * @return LocalRDBStore
+ */
+ public function getInternal( $wiki = false ) {
+ $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ if ( !isset( $this->intStoreInstances[$wiki] ) ) {
+ $this->intStoreInstances[$wiki] = new LocalRDBStore( array( 'wiki' => $wiki ) );
+ }
+ return $this->intStoreInstances[$wiki];
+ }
+
+ /**
+ * Get an external DB store by name.
+ * This uses a multi-singleton pattern to track transactions.
+ *
+ * @param $name string Storage group ID
+ * @return ExternalRDBStore
+ */
+ public function getExternal( $name ) {
+ if ( !isset( $this->extStoreInstances[$name] ) ) {
+ if ( !isset( $this->storeConfig[$name] ) ) {
+ throw new MWException( "No DB store defined with the name '$name'." );
+ }
+ $this->storeConfig[$name]['name'] = $name;
+ $this->extStoreInstances[$name] = new ExternalRDBStore( $this->storeConfig[$name] );
+ }
+ return $this->extStoreInstances[$name];
+ }
+
+ /**
+ * Get the DB store designated for a certain DB table.
+ * A LocalRDBStore will be returned if one is not configured.
+ *
+ * @param $table string Table name
+ * @param $wiki string Wiki ID
+ * @return RDBStore
+ */
+ public function getForTable( $table, $wiki = false ) {
+ $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ if ( isset( $this->storedTables[$wiki][$table] ) ) {
+ return $this->getExternal( $this->storedTables[$wiki][$table] );
+ } else {
+ return $this->getInternal( $wiki );
+ }
+ }
+
+ /**
+ * Get the DB store designated for a certain DB table group.
+ * A LocalRDBStore will be returned if one is not configured.
+ *
+ * @param $name string Group ID
+ * @param $wiki string Wiki ID
+ * @return RDBStore
+ * @throws MWException
+ */
+ public function getForTableGroup( $name, $wiki = false ) {
+ $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ if ( isset( $this->tableGroups[$name] ) && count( $this->tableGroups[$name] ) ) {
+ // Make sure all tables in this group are on the same store...
+ $table = reset( $this->tableGroups[$name] ); // first table
+ $store = $this->storeNameForTable( $table, $wiki );
+ // Make sure all tables in a each group are in a single store...
+ foreach ( $this->tableGroups[$name] as $table ) {
+ if ( $this->storeNameForTable( $table, $wiki ) !== $store ) {
+ $list = implode( ', ', $this->tableGroups[$name] );
+ throw new MWException( "Table(s) '$list' do not map to a single RDB store." );
+ }
+ }
+ return $this->getForTable( $table ); // store object for these tables
+ } else {
+ throw new MWException( "RDB store table group '$name' is undefined or empty." );
+ }
+ }
+
+ /**
+ * @param $table string Table name
+ * @return Array|null
+ */
+ public function getTableSyncInfo( $table ) {
+ return isset( $this->tableSyncInfo[$table] ) ? $this->tableSyncInfo : null;
+ }
+
+ /**
+ * @param $table string Table name
+ * @param $wiki string Wiki ID
+ * @return string|false Returns false if not an external store
+ */
+ protected function storeNameForTable( $table, $wiki ) {
+ return isset( $this->storedTables[$wiki][$table] )
+ ? $this->storedTables[$wiki][$table]
+ : false; // local
+ }
+}
diff --git a/includes/rdbstore/RDBStoreTablePartition.php b/includes/rdbstore/RDBStoreTablePartition.php
new file mode 100644
index 000000000000..582a0f6a5229
--- /dev/null
+++ b/includes/rdbstore/RDBStoreTablePartition.php
@@ -0,0 +1,255 @@
+<?php
+/**
+ * This file deals with sharded RDBMs stores.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup RDBStore
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class representing a single partition of a virtual DB table.
+ * If a shard column value is provided, queries are restricted
+ * to those that apply to that value; otherwise, queries can be
+ * made on the entire table partition.
+ *
+ * @ingroup RDBStore
+ * @since 1.20
+ */
+abstract class RDBStoreTablePartition {
+ /** @var RDBStore */
+ protected $rdbStore;
+
+ protected $wiki; // string; wiki ID
+ protected $vTable; // string; virtual table name
+ protected $pTable; // string; partition table name
+ protected $key; // string; column name
+ protected $value; // string; column value
+
+ /**
+ * @return string Wiki ID
+ */
+ final public function getWiki() {
+ return $this->wiki;
+ }
+
+ /**
+ * @return string Table name (e.g. "flaggedtemplates")
+ */
+ final public function getVirtualTable() {
+ return $this->vTable;
+ }
+
+ /**
+ * @return string Table name (e.g. "flaggedtemplates__030__ft_rev_id")
+ */
+ final public function getPartitionTable() {
+ return $this->pTable;
+ }
+
+ /**
+ * @return string Name of the column used to shard on (e.g. "ft_rev_id")
+ */
+ final public function getPartitionKey() {
+ return $this->key;
+ }
+
+ /**
+ * @return string|null Value of the shard column or NULL
+ */
+ final public function getPartitionKeyValue() {
+ return $this->value;
+ }
+
+ /**
+ * Use this function with usort() to order a list of partitions.
+ * This should be done for access patterns where several shards with the same shard
+ * column need to be written to. Doing writes in order reduces the possibility of deadlocks.
+ *
+ * @param RDBStoreTablePartition $a
+ * @param RDBStoreTablePartition $b
+ * @return integer (negative if a < b, 0 if a = b, positive if a > b)
+ */
+ final public function compare( RDBStoreTablePartition $a, RDBStoreTablePartition $b ) {
+ return strcmp( $a->getPartitionTable(), $b->getPartitionTable() );
+ }
+
+ /**
+ * Similiar to DatabaseBase::select() except the first argument is DB_SLAVE/DB_MASTER
+ *
+ * @see DatabaseBase::select()
+ * @param $index integer
+ * @param $vars Array|string
+ * @param $conds Array
+ * @param $fname string
+ * @param $options Array
+ * @return ResultWrapper
+ */
+ final public function select( $index, $vars, array $conds, $fname, $options = array() ) {
+ $this->assertKeyCond( $conds ); // sanity
+ if ( $index == DB_SLAVE ) {
+ $db = $this->getSlaveDB();
+ } elseif ( $index == DB_MASTER ) {
+ $db = $this->getMasterDB();
+ } else {
+ throw new MWException( "First argument must be DB_SLAVE or DB_MASTER." );
+ }
+ return $db->select( $this->pTable, $vars, $conds, $fname, $options );
+ }
+
+ /**
+ * Similiar to DatabaseBase::selectRow() except the first argument is DB_SLAVE/DB_MASTER
+ *
+ * @see DatabaseBase::selectRow()
+ * @param $index integer
+ * @param $vars Array|string
+ * @param $conds Array
+ * @param $fname string
+ * @param $options Array
+ * @return ResultWrapper
+ */
+ final public function selectRow( $index, $vars, array $conds, $fname, $options = array() ) {
+ $this->assertKeyCond( $conds ); // sanity
+ if ( $index == DB_SLAVE ) {
+ $db = $this->getSlaveDB();
+ } elseif ( $index == DB_MASTER ) {
+ $db = $this->getMasterDB();
+ } else {
+ throw new MWException( "First argument must be DB_SLAVE or DB_MASTER." );
+ }
+ return $db->selectRow( $this->pTable, $vars, $conds, $fname, $options );
+ }
+
+ /**
+ * Similiar to DatabaseBase::selectField() except the first argument is DB_SLAVE/DB_MASTER
+ *
+ * @see DatabaseBase::selectField()
+ * @param $index integer
+ * @param $var string
+ * @param $conds Array
+ * @param $fname string
+ * @param $options Array
+ * @return ResultWrapper
+ */
+ final public function selectField( $index, $var, array $conds, $fname, $options = array() ) {
+ $this->assertKeyCond( $conds ); // sanity
+ if ( $index == DB_SLAVE ) {
+ $db = $this->getSlaveDB();
+ } elseif ( $index == DB_MASTER ) {
+ $db = $this->getMasterDB();
+ } else {
+ throw new MWException( "First argument must be DB_SLAVE or DB_MASTER." );
+ }
+ return $db->selectField( $this->pTable, $var, $conds, $fname, $options );
+ }
+
+ /**
+ * @see DatabaseBase::insert()
+ * @see DatabaseBase::affectedRows()
+ * @param $rows Array
+ * @param $fname String
+ * @param $options Array
+ * @return integer Number of affected rows
+ */
+ final public function insert( array $rows, $fname, $options = array() ) {
+ $rows = ( isset( $rows[0] ) && is_array( $rows[0] ) ) ? $rows : array( $rows );
+ array_map( array( $this, 'assertKeyCond' ), $rows ); // sanity
+
+ $dbw = $this->getMasterDB();
+ $dbw->insert( $this->pTable, $rows, $fname, $options );
+ return $dbw->affectedRows();
+ }
+
+ /**
+ * @see DatabaseBase::update()
+ * @see DatabaseBase::affectedRows()
+ * @param $values Array
+ * @param $conds Array
+ * @param $fname String
+ * @return integer Number of affected rows
+ */
+ final public function update( $values, array $conds, $fname ) {
+ $this->assertKeyCond( $conds ); // sanity
+ $this->assertKeyNotSet( $values ); // sanity
+
+ $dbw = $this->getMasterDB();
+ $dbw->update( $this->pTable, $values, $conds, $fname );
+ return $dbw->affectedRows();
+ }
+
+ /**
+ * @see DatabaseBase::delete()
+ * @see DatabaseBase::affectedRows()
+ * @param $conds Array
+ * @param $fname String
+ * @return integer Number of affected rows
+ */
+ final public function delete( array $conds, $fname ) {
+ $this->assertKeyCond( $conds ); // sanity
+
+ $dbw = $this->getMasterDB();
+ $dbw->delete( $this->pTable, $conds, $fname );
+ return $dbw->affectedRows();
+ }
+
+ /**
+ * Get a direct slave DB connection.
+ * Queries should always be done use the provided wrappers.
+ * This can be used to call functions like DatabaseBase::timestamp().
+ *
+ * @return DatabaseBase
+ */
+ abstract public function getSlaveDB();
+
+ /**
+ * Get a direct master DB connection.
+ * Queries should always be done use the provided wrappers.
+ * This can be used to call functions like DatabaseBase::timestamp().
+ *
+ * @return DatabaseBase
+ */
+ abstract public function getMasterDB();
+
+ /**
+ * Do a (partition key => value) assertion on a WHERE or insertion row array.
+ * This sanity checks that the column actually exists and protects against callers
+ * forgetting to add the condition or saving rows to the wrong table shard.
+ *
+ * @param $conds array
+ */
+ final protected function assertKeyCond( array $conds ) {
+ if ( !isset( $conds[$this->key] ) ) {
+ throw new MWException( "Shard column '{$this->key}' value not provided." );
+ } elseif ( $this->value !== null && strval( $conds[$this->key] ) !== $this->value ) {
+ throw new MWException( "Shard column '{$this->key}' value is mismatched." );
+ }
+ }
+
+ /**
+ * Do a (partition key => value) assertion on a SET clause for an UPDATE statement.
+ * This sanity checks that the shard column value is not getting changed, which would
+ * make the row inaccessible since it would probably then be placed on the wrong shard.
+ *
+ * @param $set array
+ */
+ final protected function assertKeyNotSet( array $set ) {
+ if ( isset( $set[$this->key] ) ) {
+ throw new MWException( "Shard column '{$this->key}' given in SET clause." );
+ }
+ }
+}
diff --git a/maintenance/rdbstore/RDBAddCluster.php b/maintenance/rdbstore/RDBAddCluster.php
new file mode 100644
index 000000000000..32817e51e841
--- /dev/null
+++ b/maintenance/rdbstore/RDBAddCluster.php
@@ -0,0 +1,66 @@
+<?php
+/**
+ * Tool to help with addition of new clusters to an RDB store.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ * @ingroup Maintenance
+ */
+
+require_once( dirname( __FILE__ ) . '/../Maintenance.php' );
+
+/**
+ * Maintenance script to help with new ExternalRDBStore clusters
+ *
+ * @ingroup Maintenance
+ */
+class RDBAddCluster extends Maintenance {
+ public function __construct() {
+ parent::__construct();
+ $this->addOption( "cluster", "Name of the DB cluster", true, true );
+ $this->addOption( "dblist", "List of DBs to create", true, true );
+ $this->mDescription = "Prepare a new DB cluster for RDB storage";
+ }
+
+ function getDbType() {
+ return Maintenance::DB_ADMIN;
+ }
+
+ public function execute() {
+ $lb = wfGetLBFactory()->getExternalLB( $this->getOption( 'cluster' ) );
+
+ $server = $lb->getServerInfo( $lb->getWriterIndex() );
+ $dbw = DatabaseBase::factory( $server['type'], $server );
+ $type = $dbw->getType(); // DB server type (mysql, postgres)
+
+ $dbList = $this->getOption( 'dblist' );
+ if ( $dbList && is_file( $dbList ) ) {
+ $this->output( "Creating databases from '$dbList'.\n" );
+ $dbNames = array_filter( explode( "\n", file_get_contents( $dbList ) ), 'strlen' );
+ foreach ( $dbNames as $dbName ) {
+ $encDbName = $dbw->addIdentifierQuotes( $dbName );
+ $dbw->query( "CREATE DATABASE IF NOT EXISTS $encDbName" );
+ $this->output( "Created $type database '$dbName'.\n" );
+ }
+ }
+ $this->output( "Done.\n" );
+ }
+}
+
+$maintClass = "RDBAddCluster";
+require_once( RUN_MAINTENANCE_IF_MAIN );
diff --git a/maintenance/rdbstore/RDBPopulation.php b/maintenance/rdbstore/RDBPopulation.php
new file mode 100644
index 000000000000..c525eda2eec0
--- /dev/null
+++ b/maintenance/rdbstore/RDBPopulation.php
@@ -0,0 +1,98 @@
+<?php
+/**
+ * Tool to help copy rows from a local DB table into sharded DB storage.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ * @ingroup Maintenance
+ */
+
+require_once( dirname( __FILE__ ) . '/../Maintenance.php' );
+
+/**
+ * Maintenance script to copy a table to a sharded DB store
+ *
+ * @ingroup Maintenance
+ */
+class RDBPopulation extends Maintenance {
+ public function __construct() {
+ parent::__construct();
+ $this->addOption( "table", "Local DB table name", true, true );
+ $this->addOption( "indexColumn", "Indexed integer column", true, true );
+ $this->addOption( "shardColumn", "Shard column", true, true );
+ $this->addOption( "store", "The external RDB store name", true, true );
+ $this->mDescription = "Copy a local table into sharded DB storage";
+ $this->setBatchSize( 500 );
+ }
+
+ public function execute() {
+ $table = $this->getOption( 'table' );
+ $indexCol = $this->getOption( 'indexColumn' );
+ $shardCol = $this->getOption( 'shardColumn' );
+
+ $dbw = $this->getDB( DB_MASTER );
+ $start = $dbw->selectField( $table, "MIN($indexCol)", false, __METHOD__ );
+ $end = $dbw->selectField( $table, "MAX($indexCol)", false, __METHOD__ );
+ if ( !$start || !$end ) {
+ $this->output( "...$table table seems to be empty.\n" );
+ return;
+ } elseif ( !$dbw->fieldExists( $table, $shardCol ) ) { // sanity
+ $this->error( "Table '$table' missing column '$shardCol'.", 1 ); // die
+ }
+ $rdbStore = RDBStoreGroup::singleton()->getExternal( $this->getOption( 'store' ) );
+
+ $first = true;
+ $count = 0;
+ // $indexCol may be a BIGINT or a UID, which are too large for PHP ints.
+ // Also UIDs are sparse, rather than incrementing by units of 1 each time.
+ // Lastly, $indexCol might be only the first column of a unique index.
+ $blockStart = $start;
+ do {
+ $inequality = $first ? ">=" : ">"; // inclusive for the first block
+ $res = $dbw->select( $table, '*',
+ array(
+ "$indexCol $inequality {$dbw->addQuotes( $blockStart )}",
+ "$indexCol <= {$dbw->addQuotes( $end )}"
+ ),
+ __METHOD__,
+ array( 'ORDER BY' => $indexCol, 'LIMIT' => $this->mBatchSize )
+ );
+ $n = $dbw->numRows( $res );
+
+ if ( $n ) {
+ $res->seek( $n - 1 );
+ $blockEnd = $dbw->fetchObject( $res )->$indexCol;
+ $this->output( "...doing $indexCol from $blockStart to $blockEnd, $n row(s)\n" );
+ }
+
+ foreach ( $res as $row ) {
+ $pTable = $rdbStore->getPartition( $table, $shardCol, $row->$shardCol );
+ $count += $pTable->insert( (array)$row, __METHOD__, array( 'IGNORE' ) );
+ }
+
+ $first = false;
+ $blockStart = $blockEnd; // highest ID done so far
+ wfWaitForSlaves( 5 );
+ } while ( $n > 0 );
+
+ $this->output( "Done. Inserted $count rows from '$table' table.\n" );
+ }
+}
+
+$maintClass = "RDBPopulation";
+require_once( RUN_MAINTENANCE_IF_MAIN );
diff --git a/maintenance/rdbstore/RDBSchemaChange.php b/maintenance/rdbstore/RDBSchemaChange.php
new file mode 100644
index 000000000000..84966aea77b9
--- /dev/null
+++ b/maintenance/rdbstore/RDBSchemaChange.php
@@ -0,0 +1,124 @@
+<?php
+/**
+ * Tool to help with CREATE/ALTER statements on a virtual table that
+ * is split up into many shards in an ExternalRDBStore backend.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ * @ingroup Maintenance
+ */
+
+require_once( dirname( __FILE__ ) . '/../Maintenance.php' );
+
+/**
+ * Maintenance script to help with ExternalRDBStore schema changes
+ *
+ * @ingroup Maintenance
+ */
+class RDBSchemaChange extends Maintenance {
+ public function __construct() {
+ parent::__construct();
+ $this->addOption( "mode", "Either 'generate' or 'run'", true, true );
+ $this->addOption( "store", "The external RDB store name", true, true );
+ $this->addOption( "patchfile", "The .sql file to run", false, true );
+ $this->addOption( "dir", "Directory to generate or run .sql files", true, true );
+ $this->mDescription = "Prepare or run a schema change for a table on a wiki";
+ }
+
+ function getDbType() {
+ return Maintenance::DB_ADMIN;
+ }
+
+ public function execute() {
+ $dir = $this->getOption( 'dir' );
+ if ( !is_dir( $dir ) && !mkdir( $dir ) ) {
+ $this->error( "Invalid SQL file directory '$dir'.", 1 );
+ }
+ if ( $this->getOption( 'mode' ) === 'run' ) {
+ $this->runGeneratedSQL();
+ } else {
+ $this->generateSQL();
+ }
+ }
+
+ protected function generateSQL() {
+ $dir = $this->getOption( 'dir' );
+ // Read in the SQL patch file...
+ $patchFile = $this->getOption( 'patchfile' );
+ if ( FileBackend::extensionFromPath( $patchFile ) !== 'sql' || !is_file( $patchFile ) ) {
+ $this->error( "Invalid SQL patch file '$patchFile' given.", 1 );
+ }
+ $patchName = basename( $patchFile );
+ $sql = file_get_contents( $patchFile );
+ if ( $sql === false ) {
+ $this->error( "Could not read SQL patch file '$patchFile'.", 1 );
+ }
+ // Get the SQL patch file to run on each cluster...
+ $rdbStore = RDBStoreGroup::singleton()->getExternal( $this->getOption( 'store' ) );
+ foreach ( $rdbStore->getClusterMapping() as $cluster => $shards ) {
+ @mkdir( "{$dir}/{$cluster}" );
+ // Go through each shard that this cluster serves...
+ foreach ( $shards as $index ) {
+ // Get the padded index shard number, which appears in tables names
+ $i = $rdbStore->formatShardIndex( $index ); // e.g. "0200"
+ // Transform the patch to account for the table sharding conventions.
+ // E.g., replace things like "flaggedimages/*__#SHARD#__fi_rev_id__*/"
+ // with things like "flaggedimages__0200__fi_rev_id".
+ $count = substr_count( $sql, '/*_*/' ); // table prefix notation
+ if ( $count && $count != substr_count( $sql, '__#SHARD#__' ) ) {
+ $this->error( "Could not transform SQL patch file '$patchFile'.", 1 );
+ }
+ $shardSQL = preg_replace( '!/\*__#SHARD#__(\S+)\*/!U', "__{$i}__$1", $sql );
+ if ( $shardSQL === $sql ) {
+ $this->error( "Could not transform SQL patch file '$patchFile'.", 1 );
+ }
+ // Save SQL patch to make the schema change for each table shard
+ if ( !file_put_contents( "{$dir}/{$cluster}/{$i}-{$patchName}", $shardSQL ) ) {
+ $this->error( "Could not write SQL patch file '{$i}-{$patchName}'.", 1 );
+ }
+ }
+ }
+ }
+
+ protected function runGeneratedSQL() {
+ $wiki = $this->getOption( 'wiki' );
+ if ( !strlen( $wiki ) ) {
+ $this->error( "No wiki ID provided.", 1 );
+ }
+ $dir = $this->getOption( 'dir' );
+ // Get the SQL patch file to run on each cluster...
+ $rdbStore = RDBStoreGroup::singleton()->getExternal( $this->getOption( 'store' ) );
+ foreach ( $rdbStore->getClusterMapping() as $cluster => $shards ) {
+ // Go through all generated SQL files for this cluster...
+ if ( is_dir( "{$dir}/{$cluster}" ) ) {
+ $handle = opendir( "{$dir}/{$cluster}" );
+ while ( $file = readdir( $handle ) ) {
+ if ( FileBackend::extensionFromPath( $file ) === 'sql' ) {
+ $lb = wfGetLBFactory()->getExternalLB( $cluster );
+ $dbw = $lb->getConnection( DB_MASTER, array(), $wiki );
+ $dbw->sourceFile( "{$dir}/{$cluster}/{$file}" );
+ }
+ }
+ closedir( $handle );
+ }
+ }
+ }
+}
+
+$maintClass = "RDBSchemaChange";
+require_once( RUN_MAINTENANCE_IF_MAIN );
diff --git a/maintenance/rdbstore/RDBSyncPartitions.php b/maintenance/rdbstore/RDBSyncPartitions.php
new file mode 100644
index 000000000000..6f2082a2f20d
--- /dev/null
+++ b/maintenance/rdbstore/RDBSyncPartitions.php
@@ -0,0 +1,236 @@
+<?php
+/**
+ * Tool to sync denormalized shards in a sharded DB storage.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ * @ingroup Maintenance
+ */
+
+require_once( dirname( __FILE__ ) . '/../Maintenance.php' );
+
+/**
+ * Maintenance script to fix denormalized shards in a sharded DB store
+ *
+ * @ingroup Maintenance
+ */
+class RDBSyncPartitions extends Maintenance {
+ protected $sTimestamp; // integer; UNIX timestamp
+ protected $eTimestamp; // integer; UNIX timestamp
+
+ const TRX_LIMBO_SEC = 86400; // integer; prepared transactions this old are stale
+
+ public function __construct() {
+ parent::__construct();
+ $this->addOption( "store", "The external RDB store name", true, true );
+ $this->addOption( "cluster", "The name of a DB cluster within the store", true, true );
+ $this->addOption( "posdir", "Directory to store last-scan timestamp", true, true );
+ $this->addOption( "fixlimbo", "Resolve all day old limbo transactions (for 2PC)" );
+ $this->mDescription = "Sync denormalized table shards within sharded DB storage";
+ $this->setBatchSize( 1000 );
+ }
+
+ public function execute() {
+ global $wgRDBStoredTables;
+
+ $store = $this->getOption( 'store' );
+ $cluster = $this->getOption( 'cluster' );
+ $posdir = $this->getOption( 'posdir' );
+ $fixlimbo = $this->hasOption( 'fixlimbo' );
+ $wiki = wfWikiID();
+
+ $rdbs = RDBStoreGroup::singleton()->getExternal( $this->getOption( 'store' ) );
+ $mapping = $rdbs->getClusterMapping(); // (cluster => indexes)
+ if ( !isset( $mapping[$cluster] ) ) {
+ $this->error( "Cluster '$cluster' does not exist in store '$store'.\n", 1 ); // die
+ }
+ $indexes = $mapping[$cluster]; // indexes handled by this cluster
+
+ wfMkdirParents( "{$posdir}/{$cluster}" );
+ wfSuppressWarnings();
+ $startTsUnix = (int)file_get_contents( "{$posdir}/{$cluster}/{$wiki}" );
+ wfRestoreWarnings();
+ $startTsUnix = $startTsUnix ? $startTsUnix : 1; /* default to ~epoch */
+
+ $lb = wfGetLBFactory()->getExternalLB( $cluster );
+ $dbw = $lb->getConnection( DB_MASTER );
+
+ $endTsUnix = time(); // do not scan rows added after this point
+ if ( $rdbs->is2PCEnabled() ) {
+ $this->output( "Checking for limbo transactions in RDB store '$store'...\n" );
+ foreach ( $rdbs->connGetAllPreparedXAInternal( $dbw ) as $trxId ) {
+ if ( preg_match( '/^(\d{14})-([0-9a-f]{32})$/', $trxId, $matches ) ) {
+ list( /* all */, $ts, /* uuid */ ) = $matches;
+ $tsUnix = wfTimestamp( TS_UNIX, $ts );
+ if ( $tsUnix < ( time() - self::TRX_LIMBO_SEC ) ) {
+ if ( $fixlimbo ) { // resolve limbo transaction
+ $rdbs->connRollbackXAInternal( $dbw, $trxId );
+ $this->output( "Rolled back limbo transaction '$trxId'.\n" );
+ } else {
+ $this->output( "Detected limbo transaction '$trxId'.\n" );
+ $endTsUnix = min( $tsUnix, $endTsUnix );
+ }
+ } else {
+ $endTsUnix = min( $tsUnix, $endTsUnix );
+ }
+ }
+ }
+ }
+ $this->output( "Done.\n" );
+
+ $this->sTimestamp = $startTsUnix;
+ $this->eTimestamp = $endTsUnix - 900; // good measure (skew factor)
+
+ $this->output( "Row timestamp scan range is [" .
+ wfTimestamp( TS_MW, $this->sTimestamp ) . "," .
+ wfTimestamp( TS_MW, $this->eTimestamp ) . "]\n"
+ );
+
+ $this->output( "Syncronizing denormalized data in RDB store '$store'...\n" );
+ if ( $rdbs->isPartitioned() && isset( $wgRDBStoredTables[$wiki] ) ) {
+ foreach ( $wgRDBStoredTables[$wiki] as $table => $tstore ) { // each table
+ $syncInfo = RDBStoreGroup::singleton()->getTableSyncInfo( $table );
+ if ( $tstore === $store && is_array( $syncInfo ) ) { // syncable
+ $this->output( "Syncing table $table...\n" );
+ $count = $this->syncPush( $rdbs, $table, $syncInfo, $indexes );
+ $this->output( "Pushed $count row(s) missing rows.\n" );
+ $count = $this->syncPull( $rdbs, $table, $syncInfo, $indexes );
+ $this->output( "Removed $count row(s) bogus rows.\n" );
+ }
+ }
+ }
+ $this->output( "Done.\n" );
+
+ if ( file_put_contents( "{$posdir}/{$cluster}/{$wiki}", $this->eTimestamp ) !== false ) {
+ $this->output( "Updated UNIX timestamp position file to '{$this->eTimestamp}'.\n" );
+ }
+ }
+
+ /**
+ * Get the column => value pairs of $row with columns in the partition table
+ *
+ * @param $row array
+ * @param $tp RDBStoreTablePartition
+ */
+ protected function getRowForTable( array $row, RDBStoreTablePartition $tp ) {
+ $resRow = array();
+ foreach ( $row as $field => $value ) {
+ if ( $tp->getMasterDB()->fieldExists( $tp->getPartitionTable(), $field ) ) {
+ $resRow[$field] = $value;
+ }
+ }
+ return $resRow;
+ }
+
+ /**
+ * Push updates from the canonical source shards to the duplicate shards.
+ * We don't need to select FOR UPDATE as we should only sync immutable data.
+ *
+ * @return integer Number of rows fixed
+ */
+ protected function syncPush( RDBStore $rdbs, $table, array $info, array $indexes ) {
+ $cpartitions = array_map( $indexes, function( $index ) {
+ return new ExternalRDBStoreTablePartition(
+ $rdbs, $table, $index, $info['srcShardKey'], null, wfWikiID() );
+ } );
+ $count = 0;
+ foreach ( $cpartitions as $cpartition ) { // for each canonical shard
+ $dbw = $cpartition->getMasterDB();
+ $start = $dbw->addQuotes( $dbw->timestamp( $this->sTimestamp ) );
+ $end = $dbw->addQuotes( $dbw->timestamp( $this->eTimestamp ) );
+ do {
+ $res = $cpartition->select( DB_MASTER, '*',
+ array( "{$info['timeColumn']} BETWEEN $start AND $end" ),
+ __METHOD__,
+ array( 'ORDER BY' => $info['timeColumn'], 'LIMIT' => $this->mBatchSize )
+ );
+ $oldStart = $start; // previous start
+ foreach ( $res as $srcRow ) { // each canonical row
+ $srcRow = (array)$srcRow;
+ foreach ( $info['dupShardKeys'] as $shardKey ) { // each duplicate shard for row
+ $dpartition = $rdbs->getPartition( $table, $shardKey, $srcRow[$shardKey] );
+ $dupRow = $dpartition->selectRow( DB_MASTER, '*',
+ array_intersect_key( $srcRow, $info['uniqueKey'] ),
+ __METHOD__
+ );
+ if ( !$dupRow ) { // row missing in duplicate shard; create it
+ $dupRow = $this->getRowForTable( $srcRow, $dpartition );
+ $dpartition->insert( $dupRow, __METHOD__, array( 'IGNORE' ) );
+ $count++;
+ }
+ }
+ $start = $dbw->addQuotes( $srcRow[$info['timeColumn']] ); // advance
+ }
+ if ( $res->numRows() && $start === $oldStart ) {
+ $this->mBatchSize += 10; // don't get stuck
+ }
+ } while ( $res->numRows() >= $this->mBatchSize );
+ }
+ return $count;
+ }
+
+ /**
+ * Pull updates from the canonical source shards to the duplicate shards.
+ * We don't need to select FOR UPDATE as we should only sync immutable data.
+ *
+ * @return integer Number of rows fixed
+ */
+ protected function syncPull( RDBStore $rdbs, $table, array $info, array $indexes ) {
+ $count = 0;
+ foreach ( $info['dupShardKeys'] as $shardKey ) { // each shard key column
+ $dpartitions = array_map( $indexes, function( $index ) {
+ return new ExternalRDBStoreTablePartition(
+ $rdbs, $table, $index, $shardKey, null, wfWikiID() );
+ } );
+ foreach ( $dpartitions as $dpartition ) { // each duplicate shard
+ $dbw = $dpartition->getMasterDB();
+ $start = $dbw->addQuotes( $dbw->timestamp( $this->sTimestamp ) );
+ $end = $dbw->addQuotes( $dbw->timestamp( $this->eTimestamp ) );
+ do {
+ $res = $dpartition->select( DB_MASTER, '*',
+ array( "{$info['timeColumn']} BETWEEN $start AND $end" ),
+ __METHOD__,
+ array( 'ORDER BY' => $info['timeColumn'], 'LIMIT' => $this->mBatchSize )
+ );
+ $oldStart = $start; // previous start
+ foreach ( $res as $dupRow ) { // each duplicate row
+ $dupRow = (array)$dupRow;
+ $cpartition = $rdbs->getPartition( $table, // canonical shard for row
+ $info['srcShardKey'], $dupRow[$info['srcShardKey']] );
+ $exists = $cpartition->selectField( DB_MASTER, '1',
+ array_intersect_key( $dupRow, $info['uniqueKey'] ),
+ __METHOD__
+ );
+ if ( !$exists ) { // row in duplicate shard is extraneous; delete it
+ $dpartition->delete( $dupRow, __METHOD__ );
+ $count++;
+ }
+ $start = $dbw->addQuotes( $dupRow[$info['timeColumn']] ); // advance
+ }
+ if ( $res->numRows() && $start === $oldStart ) {
+ $this->mBatchSize += 10; // don't get stuck
+ }
+ } while ( $res->numRows() >= $this->mBatchSize );
+ }
+ }
+ return $count;
+ }
+}
+
+$maintClass = "RDBSyncPartitions";
+require_once( RUN_MAINTENANCE_IF_MAIN );
diff --git a/maintenance/rdbstore/sample.sql b/maintenance/rdbstore/sample.sql
new file mode 100644
index 000000000000..a35b6bc4e671
--- /dev/null
+++ b/maintenance/rdbstore/sample.sql
@@ -0,0 +1,12 @@
+
+-- Example table using sharding annotations
+CREATE TABLE IF NOT EXISTS /*_*/mytable/*__#SHARD#__myt_id*/ (
+ myt_id integer unsigned NOT NULL,
+ myt_name varchar(255) binary NOT NULL default '',
+ myt_timestamp varbinary(14) NULL,
+ myt_value varbinary(32) NOT NULL default '',
+
+ PRIMARY KEY (myt_id)
+) /*$wgDBTableOptions*/;
+
+CREATE UNIQUE INDEX /*i*/myt_value ON /*_*/mytable/*__#SHARD#__myt_id*/ (myt_value);