001/*
002 * Syncany, www.syncany.org
003 * Copyright (C) 2011-2016 Philipp C. Heckel <philipp.heckel@gmail.com>
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU General Public License for more details.
014 *
015 * You should have received a copy of the GNU General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package org.syncany.operations.up;
019
020import java.io.File;
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.OutputStreamWriter;
024import java.io.PrintWriter;
025import java.sql.SQLException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collection;
029import java.util.Date;
030import java.util.Iterator;
031import java.util.List;
032import java.util.concurrent.BlockingQueue;
033import java.util.concurrent.LinkedBlockingQueue;
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037import org.syncany.chunk.Deduper;
038import org.syncany.config.Config;
039import org.syncany.database.ChunkEntry;
040import org.syncany.database.DatabaseVersion;
041import org.syncany.database.DatabaseVersionHeader;
042import org.syncany.database.FileContent;
043import org.syncany.database.FileVersion;
044import org.syncany.database.MemoryDatabase;
045import org.syncany.database.MultiChunkEntry;
046import org.syncany.database.MultiChunkEntry.MultiChunkId;
047import org.syncany.database.PartialFileHistory;
048import org.syncany.database.SqlDatabase;
049import org.syncany.database.VectorClock;
050import org.syncany.database.dao.DatabaseXmlSerializer;
051import org.syncany.database.dao.DatabaseXmlSerializer.DatabaseReadType;
052import org.syncany.operations.AbstractTransferOperation;
053import org.syncany.operations.ChangeSet;
054import org.syncany.operations.cleanup.CleanupOperation;
055import org.syncany.operations.daemon.messages.UpEndSyncExternalEvent;
056import org.syncany.operations.daemon.messages.UpStartSyncExternalEvent;
057import org.syncany.operations.down.DownOperation;
058import org.syncany.operations.ls_remote.LsRemoteOperation;
059import org.syncany.operations.ls_remote.LsRemoteOperationResult;
060import org.syncany.operations.status.StatusOperation;
061import org.syncany.operations.status.StatusOperationResult;
062import org.syncany.operations.up.UpOperationResult.UpResultCode;
063import org.syncany.plugins.transfer.RemoteTransaction;
064import org.syncany.plugins.transfer.StorageException;
065import org.syncany.plugins.transfer.TransferManager;
066import org.syncany.plugins.transfer.files.DatabaseRemoteFile;
067import org.syncany.plugins.transfer.files.MultichunkRemoteFile;
068import org.syncany.plugins.transfer.files.TransactionRemoteFile;
069import org.syncany.plugins.transfer.to.ActionTO;
070import org.syncany.plugins.transfer.to.ActionTO.ActionStatus;
071import org.syncany.plugins.transfer.to.ActionTO.ActionType;
072import org.syncany.plugins.transfer.to.TransactionTO;
073
074/**
075 * The up operation implements a central part of Syncany's business logic. It analyzes the local
076 * folder, deduplicates new or changed files and uploads newly packed multichunks to the remote
077 * storage. The up operation is the complement to the {@link DownOperation}.
078 * 
079 * <p>The general operation flow is as follows:
080 * <ol>
081 *   <li>Load local database (if not already loaded)</li>
082 *   <li>Analyze local directory using the {@link StatusOperation} to determine any changed/new/deleted files</li>
083 *   <li>Determine if there are unknown remote databases using the {@link LsRemoteOperation}, and skip the rest if there are</li>
084 *   <li>If there are changes, use the {@link Deduper} and {@link Indexer} to create a new {@link DatabaseVersion}
085 *       (including new chunks, multichunks, file contents and file versions).</li>
086 *   <li>Upload new multichunks (if any) using a {@link TransferManager}</li>
087 *   <li>Save new {@link DatabaseVersion} to a new (delta) {@link MemoryDatabase} and upload it</li>
088 *   <li>Add delta database to local database and store it locally</li>
089 * </ol>
090 * 
091 * Besides the normal behavior of creating transactions from local changes and uploading these,
092 * this class is also able to upload existing transactions that have been interrupted during a previous upload attempt.
093 * The up operation analyzes maps the local changes over a number of transactions. The size of these transactions are based
094 * on the settings in {@link UpOperationOptions}.
095 * If a sequence of transactions is interrupted, all queued transactions are written to disk to be resumed later.
096 * The next up operation then reads these transactions and resumes them in the same order as they were queued before the interruption.
097 *
098 * @author Philipp C. Heckel (philipp.heckel@gmail.com)
099 */
100public class UpOperation extends AbstractTransferOperation {
101        private static final Logger logger = Logger.getLogger(UpOperation.class.getSimpleName());
102
103        public static final String ACTION_ID = "up";
104
105        private UpOperationOptions options;
106        private UpOperationResult result;
107
108        private SqlDatabase localDatabase;
109        
110        private boolean resuming;
111        private TransactionRemoteFile transactionRemoteFileToResume;
112        private Collection<RemoteTransaction> remoteTransactionsToResume;
113        private BlockingQueue<DatabaseVersion> databaseVersionQueue;
114
115        public UpOperation(Config config) {
116                this(config, new UpOperationOptions());
117        }
118
119        public UpOperation(Config config, UpOperationOptions options) {
120                super(config, ACTION_ID);
121
122                this.options = options;
123                this.result = new UpOperationResult();
124                this.localDatabase = new SqlDatabase(config);
125                
126                this.resuming = false;
127                this.transactionRemoteFileToResume = null;
128                this.remoteTransactionsToResume = null;
129                this.databaseVersionQueue = new LinkedBlockingQueue<>();
130        }
131
132        @Override
133        public UpOperationResult execute() throws Exception {
134                logger.log(Level.INFO, "");
135                logger.log(Level.INFO, "Running 'Sync up' at client " + config.getMachineName() + " ...");
136                logger.log(Level.INFO, "--------------------------------------------");
137
138                fireStartEvent();
139
140                if (!checkPreconditions()) {
141                        fireEndEvent();
142                        return result;
143                }
144
145                // Upload action file (lock for cleanup)
146                startOperation();
147
148                try {
149                        if (options.isResume()) {
150                                prepareResume();                        
151                        }
152
153                        if (!resuming) {
154                                startIndexerThread(databaseVersionQueue);                       
155                        }
156
157                        // If we are not resuming from a remote transaction, we need to clean transactions.
158                        if (transactionRemoteFileToResume == null) {
159                                transferManager.cleanTransactions();
160                        }                       
161                }
162                catch (BlockingTransfersException e) {
163                        stopBecauseOfBlockingTransactions();
164                        return result;
165                }
166                
167                // Go wild
168                int numberOfPerformedTransactions = executeTransactions();
169                updateResult(numberOfPerformedTransactions);            
170
171                // Close database connection
172                localDatabase.finalize();
173
174                // Finish 'up' before 'cleanup' starts
175                finishOperation();
176                fireEndEvent();
177                
178                return result;
179        }
180
181        private void updateResult(int numberOfPerformedTransactions) {
182                if (numberOfPerformedTransactions == 0) {
183                        logger.log(Level.INFO, "Local database is up-to-date. NOTHING TO DO!");
184                        result.setResultCode(UpResultCode.OK_NO_CHANGES);
185                }
186                else {
187                        logger.log(Level.INFO, "Sync up done.");
188                        result.setResultCode(UpResultCode.OK_CHANGES_UPLOADED);
189                }
190        }
191
192        private void startIndexerThread(BlockingQueue<DatabaseVersion> databaseVersionQueue) {
193                // Get a list of files that have been updated
194                ChangeSet localChanges = result.getStatusResult().getChangeSet();
195                List<File> locallyUpdatedFiles = extractLocallyUpdatedFiles(localChanges);
196                List<File> locallyDeletedFiles = extractLocallyDeletedFiles(localChanges);
197                
198                // Iterate over the changes, deduplicate, and feed DatabaseVersions into an iterator
199                Deduper deduper = new Deduper(config.getChunker(), config.getMultiChunker(), config.getTransformer(), options.getTransactionSizeLimit(),
200                                options.getTransactionFileLimit());
201                
202                AsyncIndexer asyncIndexer = new AsyncIndexer(config, deduper, locallyUpdatedFiles, locallyDeletedFiles, databaseVersionQueue);
203                new Thread(asyncIndexer, "AsyncI/" + config.getLocalDir().getName()).start();
204        }
205
206        private void prepareResume() throws Exception { 
207                Collection<Long> versionsToResume = transferManager.loadPendingTransactionList();
208                boolean hasVersionsToResume = versionsToResume != null && versionsToResume.size() > 0;
209
210                if (hasVersionsToResume) {
211                        logger.log(Level.INFO, "Found local transaction to resume.");
212                        logger.log(Level.INFO, "Attempting to find transactionRemoteFile");
213
214                        remoteTransactionsToResume = attemptResumeTransactions(versionsToResume);
215                        Collection<DatabaseVersion> remoteDatabaseVersionsToResume = attemptResumeDatabaseVersions(versionsToResume);
216
217                        resuming = remoteDatabaseVersionsToResume != null && remoteTransactionsToResume != null &&
218                                        remoteDatabaseVersionsToResume.size() == remoteTransactionsToResume.size();
219                        
220                        if (resuming) {
221                                databaseVersionQueue.addAll(remoteDatabaseVersionsToResume);                            
222                                databaseVersionQueue.add(new DatabaseVersion()); // Empty database version is the stopping marker                       
223                                
224                                transactionRemoteFileToResume = attemptResumeTransactionRemoteFile();                   
225                        } 
226                        else {
227                                transferManager.clearResumableTransactions();
228                        }
229                }
230                else {
231                        transferManager.clearResumableTransactions();
232                }
233        }
234
235        /**
236         *      Transfers the given {@link DatabaseVersion} objects to the remote.
237         *      Each {@link DatabaseVersion} will be transferred in its own {@link RemoteTransaction} object.
238         *      
239         *      This method resumes an interrupted sequence of earlier transactions.
240         *      It expects the {@link DatabaseVersion} and {@link RemoteTransaction} files to be in the same order as they were originally generated.
241         *      The first {@link DatabaseVersion} and {@link RemoteTransaction} objects should match the interrupted transaction.
242         *
243         *      The assumption is that the given {@link RemoteTransaction} objects match the given {@link DatabaseVersion} objects.
244         *      The given {@link TransactionRemoteFile} corresponds to the file on the remote from the interrupted transaction.
245         *
246         *      @param databaseVersionQueue The {@link DatabaseVersion} objects to send to the remote.
247         *      @param remoteTransactionsToResume {@link RemoteTransaction} objects that correspond to the given {@link DatabaseVersion} objects.
248         *      @param transactionRemoteFileToResume The file on the remote that was used for the specific transaction that was interrupted.
249         */
250        private int executeTransactions() throws Exception {
251                Iterator<RemoteTransaction> remoteTransactionsToResumeIterator = (resuming) ? remoteTransactionsToResume.iterator() : null;
252                
253                // At this point, if a failure occurs from which we can resume, new transaction files will be written
254                // Delete any old transaction files
255                transferManager.clearPendingTransactions();
256
257                boolean detectedFailure = false;
258                Exception caughtFailure = null;
259                List<RemoteTransaction> remainingRemoteTransactions = new ArrayList<>();
260                List<DatabaseVersion> remainingDatabaseVersions = new ArrayList<>();
261                
262                DatabaseVersion databaseVersion = databaseVersionQueue.take();
263                boolean noDatabaseVersions = databaseVersion.isEmpty();
264                
265                // Add dirty data to first database
266                addDirtyData(databaseVersion);
267
268                //
269                while (!databaseVersion.isEmpty()) {
270                        RemoteTransaction remoteTransaction = null;
271                        
272                        if (!resuming) {
273                                VectorClock newVectorClock = findNewVectorClock();
274                                
275                                databaseVersion.setVectorClock(newVectorClock);
276                                databaseVersion.setTimestamp(new Date());
277                                databaseVersion.setClient(config.getMachineName());
278
279                                remoteTransaction = new RemoteTransaction(config, transferManager);
280
281                                // Add multichunks to transaction
282                                logger.log(Level.INFO, "Uploading new multichunks ...");
283                                
284                                // This call adds newly changed chunks to a "RemoteTransaction", so they can be uploaded later.
285                                addMultiChunksToTransaction(remoteTransaction, databaseVersion.getMultiChunks());
286                        }
287                        else {
288                                remoteTransaction = remoteTransactionsToResumeIterator.next();
289                        }
290
291                        logger.log(Level.INFO, "Uploading database: " + databaseVersion);
292
293                        // Create delta database and commit transaction
294                        // The information about file changes is written to disk to locally "commit" the transaction. This
295                        // enables Syncany to later resume the transaction if it is interrupted before completion.
296                        writeAndAddDeltaDatabase(remoteTransaction, databaseVersion, resuming);
297
298                        // This thread is to be run when the transaction is interrupted for connectivity reasons. It will serialize
299                        // the transaction and metadata in memory such that the transaction can be resumed later.
300                        Thread writeResumeFilesShutDownHook = createAndAddShutdownHook(remoteTransaction, databaseVersion);
301
302                        // This performs the actual sync to the remote. It is executed synchronously. Only after the changes
303                        // are confirmed to have been safely pushed to the remote, will the transaction be marked as complete.
304                        if (!detectedFailure) {
305                                boolean committingFailed = true;
306                                try {
307                                        if (transactionRemoteFileToResume == null) {
308                                                remoteTransaction.commit();
309                                        }
310                                        else {
311                                                remoteTransaction.commit(config.getTransactionFile(), transactionRemoteFileToResume);
312                                                transactionRemoteFileToResume = null;
313                                        }
314
315                                        logger.log(Level.INFO, "Persisting local SQL database (new database version {0}) ...", databaseVersion.getHeader().toString());
316                                        long newDatabaseVersionId = localDatabase.writeDatabaseVersion(databaseVersion);
317
318                                        logger.log(Level.INFO, "Removing DIRTY database versions from database ...");
319                                        localDatabase.removeDirtyDatabaseVersions(newDatabaseVersionId);
320
321                                        logger.log(Level.INFO, "Adding database version to result changes:" + databaseVersion);
322                                        addNewDatabaseChangesToResultChanges(databaseVersion, result.getChangeSet());
323
324                                        result.incrementTransactionsCompleted();
325
326
327                                        logger.log(Level.INFO, "Committing local database.");
328                                        localDatabase.commit();
329
330                                        committingFailed = false;
331                                }
332                                catch (Exception e) {
333                                        detectedFailure = true;
334                                        caughtFailure = e;
335                                }
336                                finally {
337                                        // The JVM has not shut down, so we can remove the shutdown hook.
338                                        // If it turns out that committing has failed, we run it explicitly.
339                                        removeShutdownHook(writeResumeFilesShutDownHook);
340
341                                        if (committingFailed) {
342                                                remainingRemoteTransactions.add(remoteTransaction);
343                                                remainingDatabaseVersions.add(databaseVersion);
344                                        }
345                                }
346                        }
347                        else {
348                                remainingRemoteTransactions.add(remoteTransaction);
349                                remainingDatabaseVersions.add(databaseVersion);
350                        }
351                        
352                        if (!noDatabaseVersions) {
353                                logger.log(Level.FINE, "Waiting for new database version.");
354                                databaseVersion = databaseVersionQueue.take();
355                                logger.log(Level.FINE, "Took new database version: " + databaseVersion);
356                        }
357                        else {
358                                logger.log(Level.FINE, "Not waiting for new database version, last one has been taken.");
359                                break;
360                        }
361
362                }
363
364                if (detectedFailure) {
365                        localDatabase.rollback();
366                        serializeRemoteTransactionsAndMetadata(remainingRemoteTransactions, remainingDatabaseVersions);
367                        throw caughtFailure;
368                }
369
370                return (int) result.getTransactionsCompleted();
371        }
372
373        private TransactionRemoteFile attemptResumeTransactionRemoteFile() throws StorageException, BlockingTransfersException {
374                TransactionRemoteFile transactionRemoteFile = null;
375
376                // They look for the matching transaction on the remote.
377                List<TransactionRemoteFile> transactions = transferManager.getTransactionsByClient(config.getMachineName());
378
379                // If there are blocking transactions, they stop completely.
380                // Not sure yet what these blocking structures are.
381                if (transactions == null) {
382                        // We have blocking transactions
383                        stopBecauseOfBlockingTransactions();
384                        throw new BlockingTransfersException();
385                }
386
387                // There is no sign of the transaction on the remote. Clean up the local transaction.
388                if (transactions.size() != 1) {
389                        logger.log(Level.INFO, "Unable to find (unique) transactionRemoteFile. Not resuming.");
390                        transferManager.clearResumableTransactions();
391                }
392                // Remote transaction file found.
393                else {
394                        transactionRemoteFile = transactions.get(0);
395                }
396                
397                return transactionRemoteFile;
398        }
399
400        /**
401         * This method creates a Thread, which serializes the {@link remoteTransaction} in the state at the time the thread is run,
402         * as well as the {@link DatabaseVersion} that contains the metadata about what is uploaded in this transaction.
403         *
404         * @param newDatabaseVersion DatabaseVersion that contains everything that should be locally saved when current transaction is resumed.
405         *
406         * @return Thread which is attached as a shutdownHook.
407         */
408        private Thread createAndAddShutdownHook(final RemoteTransaction remoteTransaction, final DatabaseVersion newDatabaseVersion) {
409                Thread writeResumeFilesShutDownHook = new Thread(new Runnable() {
410                        @Override
411                        public void run() {
412                                serializeRemoteTransactionsAndMetadata(Arrays.asList(remoteTransaction), Arrays.asList(newDatabaseVersion));
413                        }
414                }, "ResumeShtdwn");
415
416                logger.log(Level.INFO, "Adding shutdown hook (to allow resuming the upload) ...");
417
418                Runtime.getRuntime().addShutdownHook(writeResumeFilesShutDownHook);
419                return writeResumeFilesShutDownHook;
420        }
421
422        private void removeShutdownHook(Thread writeResumeFilesShutDownHook) {
423                Runtime.getRuntime().removeShutdownHook(writeResumeFilesShutDownHook);
424        }
425
426        private void fireStartEvent() {
427                eventBus.post(new UpStartSyncExternalEvent(config.getLocalDir().getAbsolutePath()));
428        }
429
430        private void fireEndEvent() {
431                eventBus.post(new UpEndSyncExternalEvent(config.getLocalDir().getAbsolutePath(), result.getResultCode(), result.getChangeSet()));
432        }
433
434        /**
435         * This method sets the correct {@link UpResultCode} when another client has a transaction in progress with deletions.
436         */
437        private void stopBecauseOfBlockingTransactions() throws StorageException {
438                logger.log(Level.INFO, "Another client is blocking the repo with unfinished cleanup.");
439                result.setResultCode(UpResultCode.NOK_REPO_BLOCKED);
440
441                finishOperation();
442                fireEndEvent();
443        }
444
445        /**
446         * This method checks if:
447         *
448         * <ul>
449         *      <li>If there are local changes => No need for Up.</li>
450         *  <li>If another clients is running Cleanup => Not allowed to upload.</li>
451         *  <li>If remote changes exist => Should Down first.</li>
452         * </ul>
453         *
454         * @returns boolean true if Up can and should be done, false otherwise.
455         */
456        private boolean checkPreconditions() throws Exception {
457                // Find local changes
458                StatusOperation statusOperation = new StatusOperation(config, options.getStatusOptions());
459                StatusOperationResult statusOperationResult = statusOperation.execute();
460                ChangeSet localChanges = statusOperationResult.getChangeSet();
461
462                result.getStatusResult().setChangeSet(localChanges);
463
464                if (!localChanges.hasChanges()) {
465                        logger.log(Level.INFO, "Local database is up-to-date (change set). NOTHING TO DO!");
466                        result.setResultCode(UpResultCode.OK_NO_CHANGES);
467
468                        return false;
469                }
470
471                // Check if other operations are running
472                if (otherRemoteOperationsRunning(CleanupOperation.ACTION_ID)) {
473                        logger.log(Level.INFO, "* Cleanup running. Skipping down operation.");
474                        result.setResultCode(UpResultCode.NOK_UNKNOWN_DATABASES);
475
476                        return false;
477                }
478
479                // Find remote changes (unless --force is enabled)
480                if (!options.forceUploadEnabled()) {
481                        LsRemoteOperationResult lsRemoteOperationResult = new LsRemoteOperation(config, transferManager).execute();
482                        List<DatabaseRemoteFile> unknownRemoteDatabases = lsRemoteOperationResult.getUnknownRemoteDatabases();
483
484                        if (unknownRemoteDatabases.size() > 0) {
485                                logger.log(Level.INFO, "There are remote changes. Call 'down' first or use --force-upload you must, Luke!");
486                                logger.log(Level.FINE, "Unknown remote databases are: " + unknownRemoteDatabases);
487                                result.setResultCode(UpResultCode.NOK_UNKNOWN_DATABASES);
488
489                                return false;
490                        }
491                        else {
492                                logger.log(Level.INFO, "No remote changes, ready to upload.");
493                        }
494                }
495                else {
496                        logger.log(Level.INFO, "Force (--force-upload) is enabled, ignoring potential remote changes.");
497                }
498
499                return true;
500        }
501
502        /**
503         * This method takes the metadata that is to be uploaded, loads it into a {@link MemoryDatabase} and serializes
504         * it to a file. If this is not a resumption of a previous transaction, this file is added to the transaction.
505         * Finally, databaseversions that are uploaded are remembered as known, such that they are not downloaded in future Downs.
506         *
507         * @param newDatabaseVersion {@link DatabaseVersion} containing all metadata that would be locally persisted if the transaction succeeds.
508         * @param resuming boolean indicating if the current transaction is in the process of being resumed.
509         */
510        private void writeAndAddDeltaDatabase(RemoteTransaction remoteTransaction, DatabaseVersion newDatabaseVersion, boolean resuming)
511                        throws InterruptedException, StorageException,
512                        IOException,
513                        SQLException {
514                // Clone database version (necessary, because the original must not be touched)
515                DatabaseVersion deltaDatabaseVersion = newDatabaseVersion.clone();
516
517                // New delta database
518                MemoryDatabase deltaDatabase = new MemoryDatabase();
519                deltaDatabase.addDatabaseVersion(deltaDatabaseVersion);
520
521                // Save delta database locally
522                long newestLocalDatabaseVersion = getNewestDatabaseFileVersion(config.getMachineName(), localDatabase.getKnownDatabases());
523                DatabaseRemoteFile remoteDeltaDatabaseFile = new DatabaseRemoteFile(config.getMachineName(), newestLocalDatabaseVersion + 1);
524                File localDeltaDatabaseFile = config.getCache().getDatabaseFile(remoteDeltaDatabaseFile.getName());
525
526                logger.log(Level.INFO, "Saving local delta database, version {0} to file {1} ... ", new Object[] { deltaDatabaseVersion.getHeader(),
527                                localDeltaDatabaseFile });
528
529                saveDeltaDatabase(deltaDatabase, localDeltaDatabaseFile);
530
531                if (!resuming) {
532                        // Upload delta database, if we are not resuming (in which case the db is in the transaction already)
533                        logger.log(Level.INFO, "- Uploading local delta database file ...");
534                        addLocalDatabaseToTransaction(remoteTransaction, localDeltaDatabaseFile, remoteDeltaDatabaseFile);
535                }
536                // Remember uploaded database as known.
537                List<DatabaseRemoteFile> newDatabaseRemoteFiles = new ArrayList<DatabaseRemoteFile>();
538                newDatabaseRemoteFiles.add(remoteDeltaDatabaseFile);
539                localDatabase.writeKnownRemoteDatabases(newDatabaseRemoteFiles);
540        }
541
542        /**
543         * Serializes a {@link MemoryDatabase} to a file, using the configured transformer.
544         */
545        protected void saveDeltaDatabase(MemoryDatabase db, File localDatabaseFile) throws IOException {
546                logger.log(Level.INFO, "- Saving database to " + localDatabaseFile + " ...");
547
548                DatabaseXmlSerializer dao = new DatabaseXmlSerializer(config.getTransformer());
549                dao.save(db.getDatabaseVersions(), localDatabaseFile);
550        }
551
552        /**
553         * This methods iterates over all {@link DatabaseVersion}s that are dirty. Dirty means that they are not in the winning
554         * branch. All data which is contained in these dirty DatabaseVersions is added to the newDatabaseVersion, so that it
555         * is included in the new Up. Note that only metadata is reuploaded, the actual multichunks are still in the repository.
556         *
557         * @param newDatabaseVersion {@link DatabaseVersion} to which dirty data should be added.
558         */
559        private void addDirtyData(DatabaseVersion newDatabaseVersion) {
560                Iterator<DatabaseVersion> dirtyDatabaseVersions = localDatabase.getDirtyDatabaseVersions();
561
562                if (!dirtyDatabaseVersions.hasNext()) {
563                        logger.log(Level.INFO, "No DIRTY data found in database (no dirty databases); Nothing to do here.");
564                }
565                else {
566                        logger.log(Level.INFO, "Adding DIRTY data to new database version: ");
567
568                        while (dirtyDatabaseVersions.hasNext()) {
569                                DatabaseVersion dirtyDatabaseVersion = dirtyDatabaseVersions.next();
570
571                                logger.log(Level.INFO, "- Adding chunks/multichunks/filecontents from database version " + dirtyDatabaseVersion.getHeader());
572
573                                for (ChunkEntry chunkEntry : dirtyDatabaseVersion.getChunks()) {
574                                        newDatabaseVersion.addChunk(chunkEntry);
575                                }
576
577                                for (MultiChunkEntry multiChunkEntry : dirtyDatabaseVersion.getMultiChunks()) {
578                                        newDatabaseVersion.addMultiChunk(multiChunkEntry);
579                                }
580
581                                for (FileContent fileContent : dirtyDatabaseVersion.getFileContents()) {
582                                        newDatabaseVersion.addFileContent(fileContent);
583                                }
584                        }
585                }
586        }
587
588        /**
589         * This method extracts the files that are new or changed from a {@link ChangeSet} of local changes.
590         *
591         * @param localChanges {@link ChangeSet} that was the result from a StatusOperation.
592         *
593         * @return a list of Files that are new or have been changed.
594         */
595        private List<File> extractLocallyUpdatedFiles(ChangeSet localChanges) {
596                List<File> locallyUpdatedFiles = new ArrayList<File>();
597
598                for (String relativeFilePath : localChanges.getNewFiles()) {
599                        locallyUpdatedFiles.add(new File(config.getLocalDir() + File.separator + relativeFilePath));
600                }
601
602                for (String relativeFilePath : localChanges.getChangedFiles()) {
603                        locallyUpdatedFiles.add(new File(config.getLocalDir() + File.separator + relativeFilePath));
604                }
605
606                return locallyUpdatedFiles;
607        }
608
609        private List<File> extractLocallyDeletedFiles(ChangeSet localChanges) {
610                List<File> locallyDeletedFiles = new ArrayList<File>();
611
612                for (String relativeFilePath : localChanges.getDeletedFiles()) {
613                        locallyDeletedFiles.add(new File(config.getLocalDir() + File.separator + relativeFilePath));
614                }
615
616                return locallyDeletedFiles;
617        }
618
619        /**
620         * This method fills a {@link ChangeSet} with the files and changes that are uploaded, to include in
621         * the {@link UpOperationResult}.
622         *
623         * @param newDatabaseVersion {@link DatabaseVersion} that contains the changes.
624         * @param resultChanges {@ChangeSet} to which these changes are to be added.
625         */
626        private void addNewDatabaseChangesToResultChanges(DatabaseVersion newDatabaseVersion, ChangeSet resultChanges) {
627                for (PartialFileHistory partialFileHistory : newDatabaseVersion.getFileHistories()) {
628                        FileVersion lastFileVersion = partialFileHistory.getLastVersion();
629
630                        switch (lastFileVersion.getStatus()) {
631                        case NEW:
632                                resultChanges.getNewFiles().add(lastFileVersion.getPath());
633                                break;
634
635                        case CHANGED:
636                        case RENAMED:
637                                resultChanges.getChangedFiles().add(lastFileVersion.getPath());
638                                break;
639
640                        case DELETED:
641                                resultChanges.getDeletedFiles().add(lastFileVersion.getPath());
642                                break;
643                        }
644                }
645        }
646
647        /**
648         * This methods adds the multichunks that are not yet present in the remote repo to the {@link RemoteTransaction} for
649         * uploading. Multichunks are not uploaded if they are dirty.
650         *
651         * @param multiChunkEntries Collection of multiChunkEntries that are included in the new {@link DatabaseVersion}
652         */
653        private void addMultiChunksToTransaction(RemoteTransaction remoteTransaction, Collection<MultiChunkEntry> multiChunksEntries)
654                        throws InterruptedException, StorageException {
655                List<MultiChunkId> dirtyMultiChunkIds = localDatabase.getDirtyMultiChunkIds();
656
657                for (MultiChunkEntry multiChunkEntry : multiChunksEntries) {
658                        if (dirtyMultiChunkIds.contains(multiChunkEntry.getId())) {
659                                logger.log(Level.INFO, "- Ignoring multichunk (from dirty database, already uploaded), " + multiChunkEntry.getId() + " ...");
660                        }
661                        else {
662                                File localMultiChunkFile = config.getCache().getEncryptedMultiChunkFile(multiChunkEntry.getId());
663                                MultichunkRemoteFile remoteMultiChunkFile = new MultichunkRemoteFile(multiChunkEntry.getId());
664
665                                logger.log(Level.INFO, "- Uploading multichunk {0} from {1} to {2} ...", new Object[] { multiChunkEntry.getId(), localMultiChunkFile,
666                                                remoteMultiChunkFile });
667
668                                remoteTransaction.upload(localMultiChunkFile, remoteMultiChunkFile);
669                        }
670                }
671        }
672
673        private void addLocalDatabaseToTransaction(RemoteTransaction remoteTransaction, File localDatabaseFile, DatabaseRemoteFile remoteDatabaseFile)
674                        throws InterruptedException,
675                        StorageException {
676                
677                logger.log(Level.INFO, "- Uploading " + localDatabaseFile + " to " + remoteDatabaseFile + " ...");
678                remoteTransaction.upload(localDatabaseFile, remoteDatabaseFile);
679        }
680
681        /**
682         * Finds the next vector clock
683         *
684         * <p>There are two causes for not having a previous vector clock:
685         * <ul>
686         *   <li>This is the initial version
687         *   <li>A cleanup has wiped *all* database versions
688         * </ul>
689         *
690         * In the latter case, the method looks at the previous database version numbers
691         * to determine a new vector clock
692         */
693        private VectorClock findNewVectorClock() {
694                // Get last vector clock
695                DatabaseVersionHeader lastDatabaseVersionHeader = localDatabase.getLastDatabaseVersionHeader();
696                VectorClock lastVectorClock = (lastDatabaseVersionHeader != null) ? lastDatabaseVersionHeader.getVectorClock() : new VectorClock();
697
698                logger.log(Level.INFO, "Last vector clock was: " + lastVectorClock);
699
700                boolean noPreviousVectorClock = lastVectorClock.isEmpty();
701
702                if (noPreviousVectorClock) {
703                        lastVectorClock = localDatabase.getHighestKnownDatabaseFilenameNumbers();
704                }
705
706                VectorClock newVectorClock = lastVectorClock.clone();
707
708                Long lastLocalValue = lastVectorClock.getClock(config.getMachineName());
709                Long lastDirtyLocalValue = localDatabase.getMaxDirtyVectorClock(config.getMachineName());
710
711                Long newLocalValue = null;
712
713                if (lastDirtyLocalValue != null) {
714                        newLocalValue = lastDirtyLocalValue + 1;
715                }
716                else {
717                        if (lastLocalValue != null) {
718                                newLocalValue = lastLocalValue + 1;
719                        }
720                        else {
721                                newLocalValue = 1L;
722                        }
723                }
724
725                newVectorClock.setClock(config.getMachineName(), newLocalValue);
726
727                return newVectorClock;
728        }
729
730        private Collection<RemoteTransaction> attemptResumeTransactions(Collection<Long> versions) {
731                try {
732                        Collection<RemoteTransaction> remoteTransactions = new ArrayList<>();
733
734                        for (Long version : versions) {
735                                File transactionFile = config.getTransactionFile(version);
736
737                                // If a single transaction file is missing, we should restart
738                                if (!transactionFile.exists()) {
739                                        return null;
740                                }
741
742                                TransactionTO transactionTO = TransactionTO.load(null, transactionFile);
743
744                                // Verify if all files needed are in cache.
745                                for (ActionTO action : transactionTO.getActions()) {
746                                        if (action.getType() == ActionType.UPLOAD) {
747                                                if (action.getStatus() == ActionStatus.UNSTARTED) {
748                                                        if (!action.getLocalTempLocation().exists()) {
749                                                                // Unstarted upload has no cached local copy, abort
750                                                                return null;
751                                                        }
752                                                }
753                                        }
754                                }
755
756                                remoteTransactions.add(new RemoteTransaction(config, transferManager, transactionTO));
757                        }
758                        
759                        return remoteTransactions;
760                } catch (Exception e) {
761                        logger.log(Level.WARNING, "Invalid transaction file. Cannot resume!");
762                        return null;
763                }
764        }
765
766        private Collection<DatabaseVersion> attemptResumeDatabaseVersions(Collection<Long> versions) throws Exception {
767                try {
768                        Collection<DatabaseVersion> databaseVersions = new ArrayList<>();
769                        
770                        for (Long version : versions) {
771                                File databaseFile = config.getTransactionDatabaseFile(version);
772
773                                // If a single database file is missing, we should restart
774                                if (!databaseFile.exists()) {
775                                        return null;
776                                }
777
778                                DatabaseXmlSerializer databaseSerializer = new DatabaseXmlSerializer();
779                                MemoryDatabase memoryDatabase = new MemoryDatabase();
780                                databaseSerializer.load(memoryDatabase, databaseFile, null, null, DatabaseReadType.FULL);
781
782                                if (memoryDatabase.getDatabaseVersions().size() == 0) {
783                                        return null;
784                                }
785
786                                databaseVersions.add(memoryDatabase.getLastDatabaseVersion());
787                        }
788                        
789                        return databaseVersions;                        
790                } catch (Exception e) {
791                        logger.log(Level.WARNING, "Cannot load database versions from 'state'. Cannot resume.");
792                        return null;
793                }
794        }
795
796        /**
797         * Serializes both the remote transaction and the current database version
798         * that would be added if Up was successful.
799         * @param newDatabaseVersion the current metadata
800         */
801        private void serializeRemoteTransactionsAndMetadata(List<RemoteTransaction> remoteTransactions, List<DatabaseVersion> newDatabaseVersions) {
802                try {
803                        logger.log(Level.INFO, "Persisting status of UpOperation to " + config.getStateDir() + " ...");
804
805                        // Collect a list of all database version numbers that will be saved
806                        List<Long> databaseVersionClocks = new ArrayList<>();
807                        for (int i = 0; i < remoteTransactions.size(); i++) {
808                                DatabaseVersion databaseVersion = newDatabaseVersions.get(i);
809                                long databaseVersionClock = databaseVersion.getVectorClock().getClock(config.getMachineName());
810                                databaseVersionClocks.add(databaseVersionClock);
811                        }
812
813                        // Write the list of version number to a file, before serializing any transactions!
814                        // This ensures that no transaction files can exist without a "reference" to them.
815                        File transactionListFile = config.getTransactionListFile();
816                        PrintWriter transactionListWriter = new PrintWriter(new OutputStreamWriter(
817                                        new FileOutputStream(transactionListFile), "UTF-8"));
818                        for (Long databaseVersion : databaseVersionClocks) {
819                                transactionListWriter.println(databaseVersion);
820                        }
821                        transactionListWriter.close();
822
823                        // For each database version write the transaction and database files
824                        for (int i = 0; i < remoteTransactions.size(); i++) {
825                                DatabaseVersion databaseVersion = newDatabaseVersions.get(i);
826                                long databaseVersionClock = databaseVersionClocks.get(i);
827
828                                // Writing transaction file to state dir
829                                remoteTransactions.get(i).writeToFile(null, config.getTransactionFile(databaseVersionClock));
830
831                                // Writing database representation of new database version to state dir
832                                MemoryDatabase memoryDatabase = new MemoryDatabase();
833                                memoryDatabase.addDatabaseVersion(databaseVersion);
834
835                                DatabaseXmlSerializer dao = new DatabaseXmlSerializer();
836                                dao.save(memoryDatabase.getDatabaseVersions(), config.getTransactionDatabaseFile(databaseVersionClock));
837                        }
838
839                        // The first transaction may be resumable, so write it to the default transaction file
840                        remoteTransactions.get(0).writeToFile(null, config.getTransactionFile());
841                }
842                catch (Exception e) {
843                        logger.log(Level.WARNING, "Failure when persisting status of Up: ", e);
844                }
845        }
846}