func (restore *MongoRestore) RestoreOplog() error { |
| log.Logv(log.Always, "replaying oplog") |
| intent := restore.manager.Oplog() |
| if intent == nil { |
| // this should not be reached |
| log.Logv(log.Always, "no oplog file provided, skipping oplog application") |
| return nil |
| } |
| if err := intent.BSONFile.Open(); err != nil { |
| return err |
| } |
| if fileNeedsIOBuffer, ok := intent.BSONFile.(intents.FileNeedsIOBuffer); ok { |
| fileNeedsIOBuffer.TakeIOBuffer(make([]byte, db.MaxBSONSize)) |
| } |
| defer intent.BSONFile.Close() |
| // NewBufferlessBSONSource reads each bson document into its own buffer |
| // because bson.Unmarshal currently can't unmarshal binary types without |
| // them referencing the source buffer |
| bsonSource := db.NewDecodedBSONSource(db.NewBufferlessBSONSource(intent.BSONFile)) |
| defer bsonSource.Close() |
|
|
| rawOplogEntry := &bson.Raw{} |
|
|
| var totalOps int64 |
| var entrySize int |
|
|
| oplogProgressor := progress.NewCounter(intent.BSONSize) |
| if restore.ProgressManager != nil { |
| restore.ProgressManager.Attach("oplog", oplogProgressor) |
| defer restore.ProgressManager.Detach("oplog") |
| } |
|
|
| session, err := restore.SessionProvider.GetSession() |
| if err != nil { |
| return fmt.Errorf("error establishing connection: %v", err) |
| } |
| defer session.Close() |
|
|
| for bsonSource.Next(rawOplogEntry) { |
| entrySize = len(rawOplogEntry.Data) |
|
|
| entryAsOplog := db.Oplog{} |
| err = bson.Unmarshal(rawOplogEntry.Data, &entryAsOplog) |
| if err != nil { |
| return fmt.Errorf("error reading oplog: %v", err) |
| } |
| if entryAsOplog.Operation == "n" { |
| //skip no-ops |
| continue |
| } |
| if !restore.TimestampBeforeLimit(entryAsOplog.Timestamp) { |
| log.Logvf( |
| log.DebugLow, |
| "timestamp %v is not below limit of %v; ending oplog restoration", |
| entryAsOplog.Timestamp, |
| restore.oplogLimit, |
| ) |
| break |
| } |
|
|
| totalOps++ |
| oplogProgressor.Inc(int64(entrySize)) |
| err = restore.ApplyOps(session, []interface{}{entryAsOplog}) |
| if err != nil { |
| return fmt.Errorf("error applying oplog: %v", err) |
| } |
| } |
| if fileNeedsIOBuffer, ok := intent.BSONFile.(intents.FileNeedsIOBuffer); ok { |
| fileNeedsIOBuffer.ReleaseIOBuffer() |
| } |
|
|
| log.Logvf(log.Info, "applied %v ops", totalOps) |
| return nil |
|
|
| } |