Java Mailing List Archive

http://www.gg3721.com/

Home » Hibernate Commits List »

[hibernate-commits] Hibernate SVN: r14933 -
 search/trunk/src/java/org/hibernate/search/store.

hibernate-commits

2008-07-15


Author LoginPost Reply
Author: sannegrinovero
Date: 2008-07-15 17:42:16 -0400 (Tue, 15 Jul 2008)
New Revision: 14933

Modified:
 search/trunk/src/java/org/hibernate/search/store/FSMasterDirectoryProvider.java
 search/trunk/src/java/org/hibernate/search/store/FSSlaveDirectoryProvider.java
Log:
HSEARCH-189 : visibility issues in concurrent code for Master/Slave DirectoryProviders

Modified: search/trunk/src/java/org/hibernate/search/store/FSMasterDirectoryProvider.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/store/FSMasterDirectoryProvider.java  2008-07-15 21:27:03 UTC (rev 14932)
+++ search/trunk/src/java/org/hibernate/search/store/FSMasterDirectoryProvider.java  2008-07-15 21:42:16 UTC (rev 14933)
@@(protected) @@
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.io.File;
import java.io.IOException;
@@(protected) @@
public class FSMasterDirectoryProvider implements DirectoryProvider<FSDirectory> {
 
 private final Logger log = LoggerFactory.getLogger( FSMasterDirectoryProvider.class );
+  private final Timer timer = new Timer( true ); //daemon thread, the copy algorithm is robust
 
+  private volatile int current;
+  
+  //variables having visibility granted by a read of "current"
 private FSDirectory directory;
-  private int current;
 private String indexName;
-  private Timer timer;
 private SearchFactoryImplementor searchFactory;
 private long copyChunkSize;

-  //variables needed between initialize and start
+  //variables needed between initialize and start (used by same thread: no special care needed)
 private File sourceDir;
 private File indexDir;
 private String directoryProviderName;
@@(protected) @@
   }
   copyChunkSize = DirectoryProviderHelper.getCopyBufferSize( directoryProviderName, properties );
   this.searchFactory = searchFactoryImplementor;
+    current = 0; //write to volatile to publish all state
 }

 public void start() {
-    long period = DirectoryProviderHelper.getRefreshPeriod( properties, directoryProviderName );
+    int currentLocal = 0;
   try {
     //copy to source
     if ( new File( sourceDir, "current1").exists() ) {
-        current = 2;
+        currentLocal = 2;
     }
     else if ( new File( sourceDir, "current2").exists() ) {
-        current = 1;
+        currentLocal = 1;
     }
     else {
       log.debug( "Source directory for '{}' will be initialized", indexName);
-        current = 1;
+        currentLocal = 1;
     }
-      String currentString = Integer.valueOf( current ).toString();
+      String currentString = Integer.valueOf( currentLocal ).toString();
     File subDir = new File( sourceDir, currentString );
     FileHelper.synchronize( indexDir, subDir, true, copyChunkSize );
     new File( sourceDir, "current1 ").delete();
     new File( sourceDir, "current2" ).delete();
     //TODO small hole, no file can be found here
     new File( sourceDir, "current" + currentString ).createNewFile();
-      log.debug( "Current directory: {}", current );
+      log.debug( "Current directory: {}", currentLocal );
   }
   catch (IOException e) {
     throw new SearchException( "Unable to initialize index: " + directoryProviderName, e );
   }
-    timer = new Timer( true ); //daemon thread, the copy algorithm is robust
   TimerTask task = new FSMasterDirectoryProvider.TriggerTask( indexDir, sourceDir, this );
+    long period = DirectoryProviderHelper.getRefreshPeriod( properties, directoryProviderName );
   timer.scheduleAtFixedRate( task, period, period );
+    this.current = currentLocal; //write to volatile to publish all state
 }

 public FSDirectory getDirectory() {
+    @SuppressWarnings("unused")
+    int readCurrentState = current; //Unneeded value, needed to ensure visibility of state protected by memory barrier
   return directory;
 }

@@(protected) @@
   // after initialize call
   if ( obj == this ) return true;
   if ( obj == null || !( obj instanceof FSMasterDirectoryProvider ) ) return false;
-    return indexName.equals( ( (FSMasterDirectoryProvider) obj ).indexName );
+    FSMasterDirectoryProvider other = (FSMasterDirectoryProvider)obj;
+    //break both memory barriers by reading volatile variables:
+    @SuppressWarnings("unused")
+    int readCurrentState = other.current;
+    readCurrentState = this.current;
+    return indexName.equals( other.indexName );
 }

 @Override
@@(protected) @@
   // this code is actually broken since the value change after initialize call
   // but from a practical POV this is fine since we only call this method
   // after initialize call
+    @SuppressWarnings("unused")
+    int readCurrentState = current; //Unneeded value, to ensure visibility of state protected by memory barrier
   int hash = 11;
   return 37 * hash + indexName.hashCode();
 }

-
-
 public void stop() {
+    @SuppressWarnings("unused")
+    int readCurrentState = current; //Another unneeded value, to ensure visibility of state protected by memory barrier
   timer.cancel();
   try {
     directory.close();
@@(protected) @@
   }
 }

-  class TriggerTask extends TimerTask {
+  private class TriggerTask extends TimerTask {

   private final Executor executor;
   private final FSMasterDirectoryProvider.CopyDirectory copyTask;

-    public TriggerTask(File source, File destination, DirectoryProvider directoryProvider) {
+    public TriggerTask(File source, File destination, DirectoryProvider<FSDirectory> directoryProvider) {
     executor = Executors.newSingleThreadExecutor();
     copyTask = new FSMasterDirectoryProvider.CopyDirectory( source, destination, directoryProvider );
   }

   public void run() {
-      if ( ! copyTask.inProgress ) {
+      if ( copyTask.inProgress.compareAndSet( false, true ) ) {
       executor.execute( copyTask );
     }
     else {
@@(protected) @@
   }
 }

-  class CopyDirectory implements Runnable {
+  private class CopyDirectory implements Runnable {
   private final File source;
   private final File destination;
-    private volatile boolean inProgress;
-    private Lock directoryProviderLock;
-    private DirectoryProvider directoryProvider;
+    private final AtomicBoolean inProgress = new AtomicBoolean( false );
+    private final Lock directoryProviderLock;

-    public CopyDirectory(File source, File destination, DirectoryProvider directoryProvider) {
+    public CopyDirectory(File source, File destination, DirectoryProvider<FSDirectory> directoryProvider) {
     this.source = source;
     this.destination = destination;
-      this.directoryProvider = directoryProvider;
+      this.directoryProviderLock = searchFactory.getDirectoryProviderLock( directoryProvider );
   }

   public void run() {
     //TODO get rid of current and use the marker file instead?
-      long start = System.currentTimeMillis();
-      inProgress = true;
-      if ( directoryProviderLock == null ) {
-        directoryProviderLock = searchFactory.getDirectoryProviderLock( directoryProvider );
-        directoryProvider = null;
-        searchFactory = null; //get rid of any useless link (help hot redeployment?)
-      }
+      directoryProviderLock.lock();
     try {
-        directoryProviderLock.lock();
+        long start = System.currentTimeMillis();//keep time after lock is acquired for correct measure
       int oldIndex = current;
-        int index = current == 1 ? 2 : 1;
-
+        int index = oldIndex == 1 ? 2 : 1;
       File destinationFile = new File( destination, Integer.valueOf(index).toString() );
       try {
         log.trace( "Copying {} into {}", source, destinationFile );
@@(protected) @@
       catch (IOException e) {
         //don't change current
         log.error( "Unable to synchronize source of " + indexName, e );
-          inProgress = false;
         return;
       }
       if ( ! new File( destination, "current" + oldIndex ).delete() ) {
@@(protected) @@
       catch( IOException e ) {
         log.warn( "Unable to create current marker in source of " + indexName, e );
       }
+        log.trace( "Copy for {} took {} ms", indexName, (System.currentTimeMillis() - start) );
     }
     finally {
       directoryProviderLock.unlock();
-        inProgress = false;
+        inProgress.set( false );
     }
-      log.trace( "Copy for {} took {} ms", indexName, (System.currentTimeMillis() - start) );
   }
 }
}

Modified: search/trunk/src/java/org/hibernate/search/store/FSSlaveDirectoryProvider.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/store/FSSlaveDirectoryProvider.java  2008-07-15 21:27:03 UTC (rev 14932)
+++ search/trunk/src/java/org/hibernate/search/store/FSSlaveDirectoryProvider.java  2008-07-15 21:42:16 UTC (rev 14933)
@@(protected) @@
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.io.File;
import java.io.IOException;

@@(protected) @@
public class FSSlaveDirectoryProvider implements DirectoryProvider<FSDirectory> {
 
 private final Logger log = LoggerFactory.getLogger( FSSlaveDirectoryProvider.class );
+  private final Timer timer = new Timer( true ); //daemon thread, the copy algorithm is robust
 
+  private volatile int current; //used also as memory barrier of all other values, which are set once.
+  
+  //variables having visibility granted by a read of "current"
 private FSDirectory directory1;
 private FSDirectory directory2;
-  private int current;
 private String indexName;
-  private Timer timer;
 private long copyChunkSize;
 
-  //variables needed between initialize and start
+  //variables needed between initialize and start (used by same thread: no special care needed)
 private File sourceIndexDir;
 private File indexDir;
 private String directoryProviderName;
@@(protected) @@
     throw new SearchException( "Unable to initialize index: " + directoryProviderName, e );
   }
   copyChunkSize = DirectoryProviderHelper.getCopyBufferSize( directoryProviderName, properties );
+    current = 0; //publish all state to other threads
 }

 public void start() {
-    long period = DirectoryProviderHelper.getRefreshPeriod( properties, directoryProviderName );
+    int readCurrentState = current; //Unneeded value, but ensure visibility of state protected by memory barrier
+    int currentToBe = 0;
   try {
-      directory1 = DirectoryProviderHelper.createFSIndex( new File(indexDir, "1") );
-      directory2 = DirectoryProviderHelper.createFSIndex( new File(indexDir, "2") );
+      directory1 = DirectoryProviderHelper.createFSIndex( new File( indexDir, "1" ) );
+      directory2 = DirectoryProviderHelper.createFSIndex( new File( indexDir, "2" ) );
     File currentMarker = new File( indexDir, "current1" );
     File current2Marker = new File( indexDir, "current2" );
     if ( currentMarker.exists() ) {
-        current = 1;
+        currentToBe = 1;
       if ( current2Marker.exists() ) {
         current2Marker.delete(); //TODO or throw an exception?
       }
     }
     else if ( current2Marker.exists() ) {
-        current = 2;
+        currentToBe = 2;
     }
     else {
       //no default
       log.debug( "Setting directory 1 as current");
-        current = 1;
-        File destinationFile = new File( indexDir, Integer.valueOf( current ).toString() );
+        currentToBe = 1;
+        File destinationFile = new File( indexDir, Integer.valueOf( readCurrentState ).toString() );
       int sourceCurrent;
       if ( new File( sourceIndexDir, "current1").exists() ) {
         sourceCurrent = 1;
@@(protected) @@
         throw new AssertionFailure( "No current file marker found in source directory: " + sourceIndexDir.getPath() );
       }
       try {
-          FileHelper.synchronize( new File( sourceIndexDir, String.valueOf(sourceCurrent) ),
+          FileHelper.synchronize( new File( sourceIndexDir, String.valueOf( sourceCurrent ) ),
             destinationFile, true, copyChunkSize );
       }
       catch (IOException e) {
@@(protected) @@
         throw new SearchException( "Unable to create the directory marker file: " + indexName );
       }
     }
-      log.debug( "Current directory: {}", current);
+      log.debug( "Current directory: {}", currentToBe);
   }
   catch (IOException e) {
     throw new SearchException( "Unable to initialize index: " + directoryProviderName, e );
   }
-    timer = new Timer(true); //daemon thread, the copy algorithm is robust
   TimerTask task = new TriggerTask( sourceIndexDir, indexDir );
+    long period = DirectoryProviderHelper.getRefreshPeriod( properties, directoryProviderName );
   timer.scheduleAtFixedRate( task, period, period );
+    this.current = currentToBe;
 }

-  //FIXME this is Thread-Unsafe! A memory barrier is missing.
 public FSDirectory getDirectory() {
-    if (current == 1) {
+    int readState = current;// to have the read consistent in the next two "if"s.
+    if (readState == 1) {
     return directory1;
   }
-    else if (current == 2) {
+    else if (readState == 2) {
     return directory2;
   }
   else {
-      throw new AssertionFailure( "Illegal current directory: " + current );
+      throw new AssertionFailure( "Illegal current directory: " + readState );
   }
 }

@@(protected) @@
   // after initialize call
   if ( obj == this ) return true;
   if ( obj == null || !( obj instanceof FSSlaveDirectoryProvider ) ) return false;
-    return indexName.equals( ( (FSSlaveDirectoryProvider) obj ).indexName );
+    FSSlaveDirectoryProvider other = (FSSlaveDirectoryProvider)obj;
+    //need to break memory barriers on both instances:
+    @SuppressWarnings("unused")
+    int readCurrentState = this.current; //unneded value, but ensure visibility of indexName
+    readCurrentState = other.current; //another unneded value, but ensure visibility of indexName
+    return indexName.equals( other.indexName );
 }

 @Override
@@(protected) @@
   // this code is actually broken since the value change after initialize call
   // but from a practical POV this is fine since we only call this method
   // after initialize call
+    @SuppressWarnings("unused")
+    int readCurrentState = current; //unneded value, but ensure visibility of indexName
   int hash = 11;
   return 37 * hash + indexName.hashCode();
 }
@@(protected) @@
   }

   public void run() {
-      if ( ! copyTask.inProgress ) {
+      if ( copyTask.inProgress.compareAndSet( false, true ) ) {
       executor.execute( copyTask );
     }
     else {
-        log.trace( "Skipping directory synchronization, previous work still in progress: {}", indexName);
+        if (log.isTraceEnabled()) {
+          @SuppressWarnings("unused")
+          int unneeded = current;//ensure visibility of indexName in Timer threads.
+          log.trace( "Skipping directory synchronization, previous work still in progress: {}", indexName);
+        }
     }
   }
 }
@@(protected) @@
 class CopyDirectory implements Runnable {
   private final File source;
   private final File destination;
-    private volatile boolean inProgress;
+    private final AtomicBoolean inProgress = new AtomicBoolean( false );

   public CopyDirectory(File sourceIndexDir, File destination) {
     this.source = sourceIndexDir;
@@(protected) @@
   public void run() {
     long start = System.currentTimeMillis();
     try {
-        inProgress = true;
       int oldIndex = current;
-        int index = current == 1 ? 2 : 1;
+        int index = oldIndex == 1 ? 2 : 1;
       File sourceFile;
       if ( new File( source, "current1" ).exists() ) {
         sourceFile = new File(source, "1");
@@(protected) @@
       }
       else {
         log.error( "Unable to determine current in source directory" );
-          inProgress = false;
         return;
       }
-
       File destinationFile = new File( destination, Integer.valueOf( index ).toString() );
       try {
         log.trace( "Copying {} into {}", sourceFile, destinationFile );
@@(protected) @@
       catch (IOException e) {
         //don't change current
         log.error( "Unable to synchronize " + indexName, e);
-          inProgress = false;
         return;
       }
       if ( ! new File( indexName, "current" + oldIndex ).delete() ) {
@@(protected) @@
       }
     }
     finally {
-        inProgress = false;
+        inProgress.set( false );
     }
     log.trace( "Copy for {} took {} ms", indexName, (System.currentTimeMillis() - start) );
   }
 }

 public void stop() {
+    @SuppressWarnings("unused")
+    int readCurrentState = current; //unneded value, but ensure visibility of state protected by memory barrier
   timer.cancel();
   try {
     directory1.close();

_______________________________________________
hibernate-commits mailing list
hibernate-commits@(protected)
https://lists.jboss.org/mailman/listinfo/hibernate-commits
©2008 gg3721.com - Jax Systems, LLC, U.S.A.