001package co.codewizards.cloudstore.core.repo.sync; 002 003import static co.codewizards.cloudstore.core.objectfactory.ObjectFactoryUtil.*; 004import static co.codewizards.cloudstore.core.oio.OioFileFactory.*; 005import static co.codewizards.cloudstore.core.util.AssertUtil.*; 006import static co.codewizards.cloudstore.core.util.HashUtil.*; 007import static co.codewizards.cloudstore.core.util.Util.*; 008 009import java.net.MalformedURLException; 010import java.net.URL; 011import java.util.ArrayList; 012import java.util.Collection; 013import java.util.HashMap; 014import java.util.Iterator; 015import java.util.List; 016import java.util.Map; 017import java.util.Set; 018import java.util.SortedMap; 019import java.util.TreeMap; 020import java.util.UUID; 021import java.util.concurrent.Callable; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.Executors; 024import java.util.concurrent.Future; 025 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import co.codewizards.cloudstore.core.dto.ChangeSetDto; 030import co.codewizards.cloudstore.core.dto.ConfigPropSetDto; 031import co.codewizards.cloudstore.core.dto.CopyModificationDto; 032import co.codewizards.cloudstore.core.dto.DeleteModificationDto; 033import co.codewizards.cloudstore.core.dto.DirectoryDto; 034import co.codewizards.cloudstore.core.dto.FileChunkDto; 035import co.codewizards.cloudstore.core.dto.ModificationDto; 036import co.codewizards.cloudstore.core.dto.NormalFileDto; 037import co.codewizards.cloudstore.core.dto.RepoFileDto; 038import co.codewizards.cloudstore.core.dto.RepoFileDtoTreeNode; 039import co.codewizards.cloudstore.core.dto.SymlinkDto; 040import co.codewizards.cloudstore.core.dto.VersionInfoDto; 041import co.codewizards.cloudstore.core.oio.File; 042import co.codewizards.cloudstore.core.progress.ProgressMonitor; 043import co.codewizards.cloudstore.core.progress.SubProgressMonitor; 044import co.codewizards.cloudstore.core.repo.local.LocalRepoHelper; 045import co.codewizards.cloudstore.core.repo.local.LocalRepoManager; 046import co.codewizards.cloudstore.core.repo.local.LocalRepoManagerFactory; 047import co.codewizards.cloudstore.core.repo.transport.CollisionException; 048import co.codewizards.cloudstore.core.repo.transport.LocalRepoTransport; 049import co.codewizards.cloudstore.core.repo.transport.RepoTransport; 050import co.codewizards.cloudstore.core.repo.transport.RepoTransportFactory; 051import co.codewizards.cloudstore.core.repo.transport.RepoTransportFactoryRegistry; 052import co.codewizards.cloudstore.core.repo.transport.TransferDoneMarkerType; 053import co.codewizards.cloudstore.core.util.UrlUtil; 054import co.codewizards.cloudstore.core.version.VersionCompatibilityValidator; 055 056/** 057 * Logic for synchronising a local with a remote repository. 058 * @author Marco หงุ่ยตระกูล-Schulze - marco at codewizards dot co 059 */ 060public class RepoToRepoSync implements AutoCloseable { 061 private static final Logger logger = LoggerFactory.getLogger(RepoToRepoSync.class); 062 063 /** 064 * Sync in the inverse direction. This is only for testing whether the RepoTransport implementations 065 * are truly symmetric. It is less efficient! Therefore, this must NEVER be true in production!!! 066 */ 067 private static final boolean TEST_INVERSE = false; 068 069 protected final File localRoot; 070 protected final URL remoteRoot; 071 protected final LocalRepoManager localRepoManager; 072 protected final LocalRepoTransport localRepoTransport; 073 protected final RepoTransport remoteRepoTransport; 074 protected final UUID localRepositoryId; 075 protected final UUID remoteRepositoryId; 076 077 private ExecutorService localSyncExecutor; 078 private Future<Void> localSyncFuture; 079 080 /** 081 * Create an instance. 082 * @param localRoot the root of the local repository or any file/directory inside it. This is 083 * automatically adjusted to fit the connection-point to the remote repository (the remote 084 * repository might be connected to a sub-directory). 085 * @param remoteRoot the root of the remote repository. This must exactly match the connection point. 086 * If a sub-directory of the remote repository is connected to the local repository, this sub-directory 087 * must be referenced here. 088 */ 089 protected RepoToRepoSync(File localRoot, final URL remoteRoot) { 090 final File localRootWithoutPathPrefix = LocalRepoHelper.getLocalRootContainingFile(assertNotNull(localRoot, "localRoot")); 091 this.remoteRoot = UrlUtil.canonicalizeURL(assertNotNull(remoteRoot, "remoteRoot")); 092 localRepoManager = LocalRepoManagerFactory.Helper.getInstance().createLocalRepoManagerForExistingRepository(localRootWithoutPathPrefix); 093 this.localRoot = localRoot = createFile(localRootWithoutPathPrefix, localRepoManager.getLocalPathPrefixOrFail(remoteRoot)); 094 095 localRepositoryId = localRepoManager.getRepositoryId(); 096 if (localRepositoryId == null) 097 throw new IllegalStateException("localRepoManager.getRepositoryId() returned null!"); 098 099 remoteRepositoryId = localRepoManager.getRemoteRepositoryIdOrFail(remoteRoot); 100 101 remoteRepoTransport = createRepoTransport(remoteRoot, localRepositoryId); 102 localRepoTransport = (LocalRepoTransport) createRepoTransport(localRoot, remoteRepositoryId); 103 } 104 105 public static RepoToRepoSync create(final File localRoot, final URL remoteRoot) { 106 return createObject(RepoToRepoSync.class, localRoot, remoteRoot); 107 } 108 109 public void sync(final ProgressMonitor monitor) { 110 assertNotNull(monitor, "monitor"); 111 monitor.beginTask("Synchronising...", 201); 112 try { 113 final VersionInfoDto clientVersionInfoDto = localRepoTransport.getVersionInfoDto(); 114 final VersionInfoDto serverVersionInfoDto = remoteRepoTransport.getVersionInfoDto(); 115 VersionCompatibilityValidator.getInstance().validate(clientVersionInfoDto, serverVersionInfoDto); 116 117 readRemoteRepositoryIdFromRepoTransport(); 118 monitor.worked(1); 119 120 if (localSyncExecutor != null) 121 throw new IllegalStateException("localSyncExecutor != null"); 122 123 if (localSyncFuture != null) 124 throw new IllegalStateException("localSyncFuture != null"); 125 126 localSyncExecutor = Executors.newFixedThreadPool(1); 127 localSyncFuture = localSyncExecutor.submit(new Callable<Void>() { 128 @Override 129 public Void call() throws Exception { 130 logger.info("sync: locally syncing {} ('{}')", localRepositoryId, localRoot); 131 localRepoManager.localSync(new SubProgressMonitor(monitor, 50)); 132 return null; 133 } 134 }); 135 136 if (!TEST_INVERSE) { // This is the normal sync (NOT test). 137 syncDown(true, new SubProgressMonitor(monitor, 50)); 138 139 if (localSyncExecutor != null) 140 throw new IllegalStateException("localSyncExecutor != null"); 141 142 if (localSyncFuture != null) 143 throw new IllegalStateException("localSyncFuture != null"); 144 145 syncUp(new SubProgressMonitor(monitor, 50)); 146 // Immediately sync back to make sure the changes we caused don't cause problems later 147 // (right now there's very likely no collision and this should be very fast). 148 syncDown(false, new SubProgressMonitor(monitor, 50)); 149 } 150 else { // THIS IS FOR TESTING ONLY! 151 logger.info("sync: locally syncing on *remote* side {} ('{}')", localRepositoryId, localRoot); 152 remoteRepoTransport.getChangeSetDto(true); // trigger the local sync on the remote side (we don't need the change set) 153 154 waitForAndCheckLocalSyncFuture(); 155 156 syncUp(new SubProgressMonitor(monitor, 50)); 157 syncDown(false, new SubProgressMonitor(monitor, 50)); 158 syncUp(new SubProgressMonitor(monitor, 50)); 159 } 160 } finally { 161 monitor.done(); 162 } 163 } 164 165 protected void syncUp(final ProgressMonitor monitor) { 166 logger.info("syncUp: fromID={} from='{}' toID={} to='{}'", 167 localRepositoryId, localRoot, remoteRepositoryId, remoteRoot); 168 sync(localRepoTransport, false, remoteRepoTransport, monitor); 169 } 170 171 protected void syncDown(final boolean fromRepoLocalSync, final ProgressMonitor monitor) { 172 logger.info("syncDown: fromID={} from='{}' toID={} to='{}', fromRepoLocalSync={}", 173 remoteRepositoryId, remoteRoot, localRepositoryId, localRoot, fromRepoLocalSync); 174 sync(remoteRepoTransport, fromRepoLocalSync, localRepoTransport, monitor); 175 } 176 177 private void waitForAndCheckLocalSyncFutureIfExists() { 178 if (localSyncFuture != null) 179 waitForAndCheckLocalSyncFuture(); 180 } 181 182 private void waitForAndCheckLocalSyncFuture() { 183 try { 184 assertNotNull(localSyncFuture, "localSyncFuture").get(); 185 } catch (final RuntimeException e) { 186 throw e; 187 } catch (final Exception e) { 188 throw new RuntimeException(e); 189 } 190 assertNotNull(localSyncExecutor, "localSyncExecutor").shutdown(); 191 localSyncFuture = null; 192 localSyncExecutor = null; 193 } 194 195 private void readRemoteRepositoryIdFromRepoTransport() { 196 final UUID repositoryId = remoteRepoTransport.getRepositoryId(); 197 if (repositoryId == null) 198 throw new IllegalStateException("remoteRepoTransport.getRepositoryId() returned null!"); 199 200 if (!repositoryId.equals(remoteRepositoryId)) 201 throw new IllegalStateException( 202 String.format("remoteRepoTransport.getRepositoryId() does not match repositoryId in local DB! %s != %s", repositoryId, remoteRepositoryId)); 203 } 204 205 private RepoTransport createRepoTransport(final File rootFile, final UUID clientRepositoryId) { 206 URL rootURL; 207 try { 208 rootURL = rootFile.toURI().toURL(); 209 } catch (final MalformedURLException e) { 210 throw new RuntimeException(e); 211 } 212 return createRepoTransport(rootURL, clientRepositoryId); 213 } 214 215 private RepoTransport createRepoTransport(final URL remoteRoot, final UUID clientRepositoryId) { 216 final RepoTransportFactory repoTransportFactory = RepoTransportFactoryRegistry.getInstance().getRepoTransportFactoryOrFail(remoteRoot); 217 return repoTransportFactory.createRepoTransport(remoteRoot, clientRepositoryId); 218 } 219 220 protected void sync(final RepoTransport fromRepoTransport, final boolean fromRepoLocalSync, final RepoTransport toRepoTransport, final ProgressMonitor monitor) { 221 monitor.beginTask("Synchronising...", 100); 222 try { 223 final ChangeSetDto changeSetDto = fromRepoTransport.getChangeSetDto(fromRepoLocalSync); 224 monitor.worked(8); 225 226 waitForAndCheckLocalSyncFutureIfExists(); 227 toRepoTransport.prepareForChangeSetDto(changeSetDto); 228 sync(fromRepoTransport, toRepoTransport, changeSetDto, new SubProgressMonitor(monitor, 90)); 229 230 fromRepoTransport.endSyncFromRepository(); 231 toRepoTransport.endSyncToRepository(changeSetDto.getRepositoryDto().getRevision()); 232 monitor.worked(2); 233 } finally { 234 monitor.done(); 235 } 236 } 237 238 protected void sync(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 239 final ChangeSetDto changeSetDto, final ProgressMonitor monitor) { 240 monitor.beginTask("Synchronising...", 1 + changeSetDto.getModificationDtos().size() + 3 * changeSetDto.getRepoFileDtos().size() + 1); 241 try { 242 syncParentConfigPropSetDto(fromRepoTransport, toRepoTransport, changeSetDto.getParentConfigPropSetDto(), 243 new SubProgressMonitor(monitor, 1)); 244 245 final RepoFileDtoTreeNode repoFileDtoTree = RepoFileDtoTreeNode.createTree(changeSetDto.getRepoFileDtos()); 246 if (repoFileDtoTree != null) { 247 sync(fromRepoTransport, toRepoTransport, repoFileDtoTree, 248 new Class<?>[] { DirectoryDto.class }, new Class<?>[0], false, 249 new SubProgressMonitor(monitor, repoFileDtoTree.size())); 250 } 251 252 syncModifications(fromRepoTransport, toRepoTransport, changeSetDto.getModificationDtos(), 253 new SubProgressMonitor(monitor, changeSetDto.getModificationDtos().size())); 254 255 if (repoFileDtoTree != null) { 256 sync(fromRepoTransport, toRepoTransport, repoFileDtoTree, 257 new Class<?>[] { RepoFileDto.class }, new Class<?>[] { DirectoryDto.class }, true, 258 new SubProgressMonitor(monitor, repoFileDtoTree.size())); 259 } 260 261 if (repoFileDtoTree != null) { 262 sync(fromRepoTransport, toRepoTransport, repoFileDtoTree, 263 new Class<?>[] { RepoFileDto.class }, new Class<?>[] { DirectoryDto.class }, false, 264 new SubProgressMonitor(monitor, repoFileDtoTree.size())); 265 } 266 } finally { 267 monitor.done(); 268 } 269 } 270 271 protected void syncParentConfigPropSetDto(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 272 final ConfigPropSetDto parentConfigPropSetDto, final ProgressMonitor monitor) { 273 assertNotNull(fromRepoTransport, "fromRepoTransport"); 274 assertNotNull(toRepoTransport, "toRepoTransport"); 275 // parentConfigPropSetDto may be null! 276 assertNotNull(monitor, "monitor"); 277 278 monitor.beginTask("Synchronising parent-config...", 1); 279 try { 280 if (parentConfigPropSetDto == null) 281 return; 282 283 toRepoTransport.putParentConfigPropSetDto(parentConfigPropSetDto); 284 } finally { 285 monitor.done(); 286 } 287 } 288 289 protected void sync(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 290 final RepoFileDtoTreeNode repoFileDtoTree, 291 final Class<?>[] repoFileDtoClassesIncl, final Class<?>[] repoFileDtoClassesExcl, final boolean filesInProgressOnly, 292 final ProgressMonitor monitor) { 293 assertNotNull(fromRepoTransport, "fromRepoTransport"); 294 assertNotNull(toRepoTransport, "toRepoTransport"); 295 assertNotNull(repoFileDtoTree, "repoFileDtoTree"); 296 assertNotNull(repoFileDtoClassesIncl, "repoFileDtoClassesIncl"); 297 assertNotNull(repoFileDtoClassesExcl, "repoFileDtoClassesExcl"); 298 assertNotNull(monitor, "monitor"); 299 300 final Map<Class<?>, Boolean> repoFileDtoClass2Included = new HashMap<Class<?>, Boolean>(); 301 final Map<Class<?>, Boolean> repoFileDtoClass2Excluded = new HashMap<Class<?>, Boolean>(); 302 303 final Set<String> fileInProgressPaths = filesInProgressOnly 304 ? localRepoTransport.getFileInProgressPaths(fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId()) 305 : null; 306 307 monitor.beginTask("Synchronising...", repoFileDtoTree.size()); 308 try { 309 for (final RepoFileDtoTreeNode repoFileDtoTreeNode : repoFileDtoTree) { 310 if (repoFileDtoTreeNode.getRepoFileDto().isNeededAsParent()) { // not actually modified - serves only to complete the tree structure. 311 monitor.worked(1); 312 continue; 313 } 314 315 if (fileInProgressPaths != null && ! fileInProgressPaths.contains(repoFileDtoTreeNode.getPath())) { 316 monitor.worked(1); 317 continue; 318 } 319 final RepoFileDto repoFileDto = repoFileDtoTreeNode.getRepoFileDto(); 320 final Class<? extends RepoFileDto> repoFileDtoClass = repoFileDto.getClass(); 321 322 Boolean included = repoFileDtoClass2Included.get(repoFileDtoClass); 323 if (included == null) { 324 included = false; 325 for (final Class<?> clazz : repoFileDtoClassesIncl) { 326 if (clazz.isAssignableFrom(repoFileDtoClass)) { 327 included = true; 328 break; 329 } 330 } 331 repoFileDtoClass2Included.put(repoFileDtoClass, included); 332 } 333 334 Boolean excluded = repoFileDtoClass2Excluded.get(repoFileDtoClass); 335 if (excluded == null) { 336 excluded = false; 337 for (final Class<?> clazz : repoFileDtoClassesExcl) { 338 if (clazz.isAssignableFrom(repoFileDtoClass)) { 339 excluded = true; 340 break; 341 } 342 } 343 repoFileDtoClass2Excluded.put(repoFileDtoClass, excluded); 344 } 345 346 if (!included || excluded) { 347 monitor.worked(1); 348 continue; 349 } 350 351 if (isDone(fromRepoTransport, toRepoTransport, repoFileDto)) { 352 logger.debug("sync: Skipping file already done in an interrupted transfer before: {}", repoFileDtoTreeNode.getPath()); 353 monitor.worked(1); 354 continue; 355 } 356 357 if (repoFileDto instanceof DirectoryDto) 358 syncDirectory(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, (DirectoryDto) repoFileDto, new SubProgressMonitor(monitor, 1)); 359 else if (repoFileDto instanceof NormalFileDto) { 360 syncFile(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, repoFileDto, monitor); 361 } 362 else if (repoFileDto instanceof SymlinkDto) 363 syncSymlink(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, (SymlinkDto) repoFileDto, new SubProgressMonitor(monitor, 1)); 364 else 365 throw new IllegalStateException("Unsupported RepoFileDto type: " + repoFileDto); 366 367 markDone(fromRepoTransport, toRepoTransport, repoFileDto); 368 } 369 } finally { 370 monitor.done(); 371 } 372 } 373 374 private boolean isDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final RepoFileDto repoFileDto) { 375 return localRepoTransport.isTransferDone( 376 fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), 377 TransferDoneMarkerType.REPO_FILE, repoFileDto.getId(), repoFileDto.getLocalRevision()); 378 } 379 380 private void markDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final RepoFileDto repoFileDto) { 381 localRepoTransport.markTransferDone( 382 fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), 383 TransferDoneMarkerType.REPO_FILE, repoFileDto.getId(), repoFileDto.getLocalRevision()); 384 } 385 386 private boolean isDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final ModificationDto modificationDto) { 387 return localRepoTransport.isTransferDone( 388 fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), 389 TransferDoneMarkerType.MODIFICATION, modificationDto.getId(), modificationDto.getLocalRevision()); 390 } 391 392 private void markDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final ModificationDto modificationDto) { 393 localRepoTransport.markTransferDone( 394 fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), 395 TransferDoneMarkerType.MODIFICATION, modificationDto.getId(), modificationDto.getLocalRevision()); 396 } 397 398 private SortedMap<Long, Collection<ModificationDto>> getLocalRevision2ModificationDtos(final Collection<ModificationDto> modificationDtos) { 399 final SortedMap<Long, Collection<ModificationDto>> map = new TreeMap<Long, Collection<ModificationDto>>(); 400 for (final ModificationDto modificationDto : modificationDtos) { 401 final long localRevision = modificationDto.getLocalRevision(); 402 Collection<ModificationDto> collection = map.get(localRevision); 403 if (collection == null) { 404 collection = new ArrayList<ModificationDto>(); 405 map.put(localRevision, collection); 406 } 407 collection.add(modificationDto); 408 } 409 return map; 410 } 411 412 private void syncModifications(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final Collection<ModificationDto> modificationDtos, final ProgressMonitor monitor) { 413 monitor.beginTask("Synchronising...", modificationDtos.size()); 414 try { 415 final SortedMap<Long,Collection<ModificationDto>> localRevision2ModificationDtos = getLocalRevision2ModificationDtos(modificationDtos); 416 for (final Map.Entry<Long,Collection<ModificationDto>> me : localRevision2ModificationDtos.entrySet()) { 417 final ModificationDtoSet modificationDtoSet = new ModificationDtoSet(me.getValue()); 418 419 for (final List<CopyModificationDto> copyModificationDtos : modificationDtoSet.getFromPath2CopyModificationDtos().values()) { 420 for (final Iterator<CopyModificationDto> itCopyMod = copyModificationDtos.iterator(); itCopyMod.hasNext(); ) { 421 final CopyModificationDto copyModificationDto = itCopyMod.next(); 422 423 if (isDone(fromRepoTransport, toRepoTransport, copyModificationDto)) { 424 logger.debug("sync: Skipping CopyModificaton already done in an interrupted transfer before: {} => {}", copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 425 monitor.worked(1); 426 continue; 427 } 428 429 final List<DeleteModificationDto> deleteModificationDtos = modificationDtoSet.getPath2DeleteModificationDtos().get(copyModificationDto.getFromPath()); 430 boolean moveInstead = false; 431 if (!itCopyMod.hasNext() && deleteModificationDtos != null && !deleteModificationDtos.isEmpty()) 432 moveInstead = true; 433 434 if (moveInstead) { 435 logger.info("syncModifications: Moving from '{}' to '{}'", copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 436 toRepoTransport.move(copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 437 } 438 else { 439 logger.info("syncModifications: Copying from '{}' to '{}'", copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 440 toRepoTransport.copy(copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 441 } 442 443 if (!moveInstead && deleteModificationDtos != null) { 444 for (final DeleteModificationDto deleteModificationDto : deleteModificationDtos) { 445 logger.info("syncModifications: Deleting '{}'", deleteModificationDto.getPath()); 446 applyDeleteModification(fromRepoTransport, toRepoTransport, deleteModificationDto); 447 } 448 } 449 450 markDone(fromRepoTransport, toRepoTransport, copyModificationDto); 451 } 452 } 453 454 for (final List<DeleteModificationDto> deleteModificationDtos : modificationDtoSet.getPath2DeleteModificationDtos().values()) { 455 for (final DeleteModificationDto deleteModificationDto : deleteModificationDtos) { 456 if (isDone(fromRepoTransport, toRepoTransport, deleteModificationDto)) { 457 logger.debug("sync: Skipping DeleteModificaton already done in an interrupted transfer before: {}", deleteModificationDto.getPath()); 458 monitor.worked(1); 459 continue; 460 } 461 462 logger.info("syncModifications: Deleting '{}'", deleteModificationDto.getPath()); 463 applyDeleteModification(fromRepoTransport, toRepoTransport, deleteModificationDto); 464 465 markDone(fromRepoTransport, toRepoTransport, deleteModificationDto); 466 } 467 } 468 } 469 } finally { 470 monitor.done(); 471 } 472 } 473 474 protected void applyDeleteModification(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final DeleteModificationDto deleteModificationDto) { 475 assertNotNull(fromRepoTransport, "fromRepoTransport"); 476 assertNotNull(toRepoTransport, "toRepoTransport"); 477 assertNotNull(deleteModificationDto, "deleteModificationDto"); 478 479 try { 480 delete(fromRepoTransport, toRepoTransport, deleteModificationDto); 481 } catch (final CollisionException x) { // Note: This cannot happen in CloudStore! But in can happen in downstream projects with different RepoTransport implementations! 482 logger.info("CollisionException during delete: {}", deleteModificationDto.getPath()); 483 if (logger.isDebugEnabled()) 484 logger.debug(x.toString(), x); 485 486 return; 487 } 488 } 489 490 protected void delete(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final DeleteModificationDto deleteModificationDto) { 491 toRepoTransport.delete(deleteModificationDto.getPath()); 492 } 493 494 private void syncDirectory( 495 final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 496 final RepoFileDtoTreeNode repoFileDtoTreeNode, final DirectoryDto directoryDto, final ProgressMonitor monitor) { 497 monitor.beginTask("Synchronising...", 100); 498 try { 499 final String path = repoFileDtoTreeNode.getPath(); 500 logger.info("syncDirectory: path='{}'", path); 501 try { 502 makeDirectory(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, directoryDto); 503 } catch (final CollisionException x) { 504 logger.info("CollisionException during makeDirectory: {}", path); 505 if (logger.isDebugEnabled()) 506 logger.debug(x.toString(), x); 507 508 return; 509 } 510 } finally { 511 monitor.done(); 512 } 513 } 514 515 protected void makeDirectory(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 516 final RepoFileDtoTreeNode repoFileDtoTreeNode, final String path, final DirectoryDto directoryDto) { 517 toRepoTransport.makeDirectory(path, directoryDto.getLastModified()); 518 } 519 520 private void syncSymlink( 521 final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 522 final RepoFileDtoTreeNode repoFileDtoTreeNode, final SymlinkDto symlinkDto, final SubProgressMonitor monitor) { 523 monitor.beginTask("Synchronising...", 100); 524 try { 525 final String path = repoFileDtoTreeNode.getPath(); 526 try { 527 toRepoTransport.makeSymlink(path, symlinkDto.getTarget(), symlinkDto.getLastModified()); 528 } catch (final CollisionException x) { 529 logger.info("CollisionException during makeSymlink: {}", path); 530 if (logger.isDebugEnabled()) 531 logger.debug(x.toString(), x); 532 533 return; 534 } 535 } finally { 536 monitor.done(); 537 } 538 } 539 540 private void syncFile(final RepoTransport fromRepoTransport, 541 final RepoTransport toRepoTransport, final RepoFileDtoTreeNode repoFileDtoTreeNode, 542 final RepoFileDto normalFileDto, final ProgressMonitor monitor) { 543 monitor.beginTask("Synchronising...", 100); 544 try { 545 final String path = repoFileDtoTreeNode.getPath(); 546 logger.info("syncFile: path='{}'", path); 547 548 final RepoFileDto fromRepoFileDto = fromRepoTransport.getRepoFileDto(path); 549 if (fromRepoFileDto == null) { 550 logger.warn("File was deleted during sync on source side: {}", path); 551 return; 552 } 553 if (!(fromRepoFileDto instanceof NormalFileDto)) { 554 logger.warn("Normal file was replaced by a directory (or another type) during sync on source side: {}", path); 555 return; 556 } 557 monitor.worked(10); 558 559 final NormalFileDto fromNormalFileDto = (NormalFileDto) fromRepoFileDto; 560 561 final RepoFileDto toRepoFileDto = toRepoTransport.getRepoFileDto(path); 562 if (areFilesExistingAndEqual(fromRepoFileDto, toRepoFileDto)) { 563 logger.info("File is already equal on destination side (sha1='{}'): {}", fromNormalFileDto.getSha1(), path); 564 return; 565 } 566 monitor.worked(10); 567 568 logger.info("Beginning to copy file (from.sha1='{}' to.sha1='{}'): {}", fromNormalFileDto.getSha1(), 569 toRepoFileDto instanceof NormalFileDto ? ((NormalFileDto)toRepoFileDto).getSha1() : "<NoInstanceOf_NormalFileDto>", 570 path); 571 572 final NormalFileDto toNormalFileDto; 573 if (toRepoFileDto instanceof NormalFileDto) 574 toNormalFileDto = (NormalFileDto) toRepoFileDto; 575 else 576 toNormalFileDto = createObject(NormalFileDto.class); // dummy (null-object pattern) 577 578 try { 579 beginPutFile(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fromNormalFileDto); 580 } catch (final CollisionException x) { 581 logger.info("CollisionException during beginPutFile: {}", path); 582 if (logger.isDebugEnabled()) 583 logger.debug(x.toString(), x); 584 585 return; 586 } 587 localRepoTransport.markFileInProgress(fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), path, true); 588 monitor.worked(1); 589 590 final Map<Long, FileChunkDto> offset2ToTempFileChunkDto = new HashMap<>(toNormalFileDto.getTempFileChunkDtos().size()); 591 for (final FileChunkDto toTempFileChunkDto : toNormalFileDto.getTempFileChunkDtos()) 592 offset2ToTempFileChunkDto.put(toTempFileChunkDto.getOffset(), toTempFileChunkDto); 593 594 logger.debug("Comparing {} FileChunkDtos. path='{}'", fromNormalFileDto.getFileChunkDtos().size(), path); 595 final List<FileChunkDto> fromFileChunkDtosDirty = new ArrayList<FileChunkDto>(); 596 final Iterator<FileChunkDto> toFileChunkDtoIterator = toNormalFileDto.getFileChunkDtos().iterator(); 597 int fileChunkIndex = -1; 598 for (final FileChunkDto fromFileChunkDto : fromNormalFileDto.getFileChunkDtos()) { 599 final FileChunkDto toFileChunkDto = toFileChunkDtoIterator.hasNext() ? toFileChunkDtoIterator.next() : null; 600 ++fileChunkIndex; 601 final FileChunkDto toTempFileChunkDto = offset2ToTempFileChunkDto.get(fromFileChunkDto.getOffset()); 602 if (toTempFileChunkDto == null) { 603 if (toFileChunkDto != null 604 && equal(fromFileChunkDto.getOffset(), toFileChunkDto.getOffset()) 605 && equal(fromFileChunkDto.getLength(), toFileChunkDto.getLength()) 606 && equal(fromFileChunkDto.getSha1(), toFileChunkDto.getSha1())) { 607 if (logger.isTraceEnabled()) { 608 logger.trace("Skipping clean FileChunkDto. index={} offset={} sha1='{}'", 609 fileChunkIndex, fromFileChunkDto.getOffset(), fromFileChunkDto.getSha1()); 610 } 611 continue; 612 } 613 } 614 else { 615 if (equal(fromFileChunkDto.getOffset(), toTempFileChunkDto.getOffset()) 616 && equal(fromFileChunkDto.getLength(), toTempFileChunkDto.getLength()) 617 && equal(fromFileChunkDto.getSha1(), toTempFileChunkDto.getSha1())) { 618 if (logger.isTraceEnabled()) { 619 logger.trace("Skipping clean temporary FileChunkDto. index={} offset={} sha1='{}'", 620 fileChunkIndex, fromFileChunkDto.getOffset(), fromFileChunkDto.getSha1()); 621 } 622 continue; 623 } 624 } 625 626 if (logger.isTraceEnabled()) { 627 logger.trace("Enlisting dirty FileChunkDto. index={} fromOffset={} toOffset={} fromSha1='{}' toSha1='{}'", 628 fileChunkIndex, fromFileChunkDto.getOffset(), 629 (toFileChunkDto == null ? "null" : toFileChunkDto.getOffset()), 630 fromFileChunkDto.getSha1(), 631 (toFileChunkDto == null ? "null" : toFileChunkDto.getSha1())); 632 } 633 fromFileChunkDtosDirty.add(fromFileChunkDto); 634 } 635 636 logger.info("Need to copy {} dirty file-chunks (of {} total). path='{}'", 637 fromFileChunkDtosDirty.size(), fromNormalFileDto.getFileChunkDtos().size(), path); 638 639 final ProgressMonitor subMonitor = new SubProgressMonitor(monitor, 73); 640 subMonitor.beginTask("Synchronising...", fromFileChunkDtosDirty.size()); 641 fileChunkIndex = -1; 642 long bytesCopied = 0; 643 final long copyChunksBeginTimestamp = System.currentTimeMillis(); 644 for (final FileChunkDto fileChunkDto : fromFileChunkDtosDirty) { 645 ++fileChunkIndex; 646 if (logger.isTraceEnabled()) { 647 logger.trace("Reading data for dirty FileChunkDto (index {} of {}). path='{}' offset={}", 648 fileChunkIndex, fromFileChunkDtosDirty.size(), path, fileChunkDto.getOffset()); 649 } 650 final byte[] fileData = getFileData(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fileChunkDto); 651 652 if (fileData == null) { 653 logger.warn("Source file was modified or deleted during sync: {}", path); 654 // The file is left in state 'inProgress'. Thus it should definitely not be synced back in the opposite 655 // direction. The file should be synced again in the correct direction in the next run (after the source 656 // repo did a local sync, too). 657 return; 658 } 659 660 if (logger.isTraceEnabled()) { 661 logger.trace("Writing data for dirty FileChunkDto ({} of {}). path='{}' offset={}", 662 fileChunkIndex + 1, fromFileChunkDtosDirty.size(), path, fileChunkDto.getOffset()); 663 } 664 665 try { 666 putFileData(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fileChunkDto, fileData); 667 } catch (final CollisionException x) { // Never happens in CloudStore, but in down-stream-projects. Important: They must handle this properly themselves! 668 logger.info("CollisionException during putFileData: {}", path); 669 if (logger.isDebugEnabled()) 670 logger.debug(x.toString(), x); 671 672 return; 673 } 674 675 bytesCopied += fileData.length; 676 subMonitor.worked(1); 677 } 678 subMonitor.done(); 679 680 logger.info("Copied {} dirty file-chunks with together {} bytes in {} ms. path='{}'", 681 fromFileChunkDtosDirty.size(), bytesCopied, System.currentTimeMillis() - copyChunksBeginTimestamp, path); 682 683 endPutFile(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fromNormalFileDto); 684 localRepoTransport.markFileInProgress(fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), path, false); 685 monitor.worked(6); 686 } finally { 687 monitor.done(); 688 } 689 } 690 691 protected byte[] getFileData(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 692 final RepoFileDtoTreeNode repoFileDtoTreeNode, 693 final String path, final FileChunkDto fileChunkDto) { 694 695 final byte[] fileData = fromRepoTransport.getFileData(path, fileChunkDto.getOffset(), fileChunkDto.getLength()); 696 if (fileData == null) 697 return null; // file was deleted 698 699 if (fileData.length != fileChunkDto.getLength() || !sha1(fileData).equals(fileChunkDto.getSha1())) 700 return null; // file was modified 701 702 return fileData; 703 } 704 705 protected void putFileData(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 706 final RepoFileDtoTreeNode repoFileDtoTreeNode, 707 final String path, final FileChunkDto fileChunkDto, 708 final byte[] fileData) { 709 710 toRepoTransport.putFileData(path, fileChunkDto.getOffset(), fileData); 711 } 712 713 protected void beginPutFile(final RepoTransport fromRepoTransport, 714 final RepoTransport toRepoTransport, final RepoFileDtoTreeNode repoFileDtoTreeNode, 715 final String path, final NormalFileDto fromNormalFileDto) throws CollisionException { 716 717 toRepoTransport.beginPutFile(path); 718 } 719 720 protected void endPutFile(final RepoTransport fromRepoTransport, 721 final RepoTransport toRepoTransport, final RepoFileDtoTreeNode repoFileDtoTreeNode, 722 final String path, final NormalFileDto fromNormalFileDto) { 723 724 toRepoTransport.endPutFile( 725 path, fromNormalFileDto.getLastModified(), 726 fromNormalFileDto.getLength(), fromNormalFileDto.getSha1()); 727 } 728 729 private boolean areFilesExistingAndEqual(final RepoFileDto fromRepoFileDto, final RepoFileDto toRepoFileDto) { 730 if (!(fromRepoFileDto instanceof NormalFileDto)) 731 return false; 732 733 if (!(toRepoFileDto instanceof NormalFileDto)) 734 return false; 735 736 final NormalFileDto fromNormalFileDto = (NormalFileDto) fromRepoFileDto; 737 final NormalFileDto toNormalFileDto = (NormalFileDto) toRepoFileDto; 738 739 return equal(fromNormalFileDto.getLength(), toNormalFileDto.getLength()) 740 && equal(fromNormalFileDto.getLastModified(), toNormalFileDto.getLastModified()) 741 && equal(fromNormalFileDto.getSha1(), toNormalFileDto.getSha1()); 742 } 743 744 @Override 745 public void close() { 746 localRepoManager.close(); 747 localRepoTransport.close(); 748 remoteRepoTransport.close(); 749 } 750}