HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

ObservingCache - scheduled task executor that returns a result

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
resultexecutorthatreturnsobservingcachetaskscheduled

Problem

I wrote a utility class called ObservingCache, which receives a task in the form of Supplier and an interval, computes the task every [interval] millis and allows its result to be queried via the getItems() method.
I'm using for making sure that outside resources (such as configuration files or DB tables) are always in sync with the app and can be modified without having to restart the application.

The code is as follows:

public class ObservingCache {
  private static final int DEFAULT_CACHE_REFRESH_INTERVAL = 10 * 60 * 1000; // 10 minutes
  private static final int DEFAULT_THREAD_POOL_SIZE = 10;

  private static volatile ScheduledExecutorService executor;

  protected T items;

  public ObservingCache(Supplier syncFunc) {
      this(syncFunc, DEFAULT_CACHE_REFRESH_INTERVAL, true);
  }

  public ObservingCache(Supplier syncFunc, boolean firstRunBlocking) {
      this(syncFunc, DEFAULT_CACHE_REFRESH_INTERVAL, firstRunBlocking);
  }

  public ObservingCache(Supplier syncFunc, int intervalMillis) {
      this(syncFunc, intervalMillis, true);
  }

  public ObservingCache(Supplier syncFunc, int intervalMillis, boolean firstRunBlocking) {
      initExecutor();
      Runnable task = () -> {
          T result = syncFunc.get();
          if (result != null) {
              items = result;
          }
      };
      if (firstRunBlocking) {         
          task.run(); // First run is blocking (saves a lot of trouble later).
      }
      executor.scheduleAtFixedRate(task, firstRunBlocking ? intervalMillis : 0, intervalMillis, TimeUnit.MILLISECONDS);
  }

  private void initExecutor() {
      if (executor == null || executor.isShutdown()) {
          synchronized (this) {
              if (executor == null || executor.isShutdown()) {
                  executor = Executors.newScheduledThreadPool(DEFAULT_THREAD_POOL_SIZE);
              }
          }
      }
  }

  public T getItems() {
      return items;
  }
}


I'd really like to hear your opinion about this

Solution

Class is open to modification

ObservingCache should be closed for modification, this is the OCP principle. If it's not, its behaviour can be altered (by inheritance for example) and can lead to an unpredictable behaviour which make it rather difficult to test and maintain.

How to achieve that ? Mark ObservableCache final.

Executor never shutdowned

Once started, the executor can never get shutdown, which causes the task to be run indefinitly.

How to achieve that ? Create a method:

/**
 * Once called, the cached value won't be updated anymore.
 */
public void stopObserving() {
    executor.shutdownNow();
}


firstRunBlocking

This parameter is the thing that bothers me the most in your class, and also the one who gave me the most trouble when trying to remove it.

Why does it bothers me ? Everytime you have a boolean in a method (or constructor), it's a sign of poor design because the class/method should do 2 things now (one for each boolean's value), therefore violating the single responsibility principle. It makes your code hardest to test and to maintain because of the conditionnal logic flow.

If I understand correctly, the purpose of this flag is to avoid client code retrieving a null when calling getItem because the cache has not been updated at least once. I would rather resolve this problem by blocking in getItem as long as the cache has not been computed once. In Java, one have the CountDownLatch who is a thread-safe class able to fulfill this responsibility by blocking only until the cache is computed the first time.

I came up with the following wrapper to achieve that:

public class BlockingHolder {
    private T value;
    private final CountDownLatch barrier = new CountDownLatch(1);

    public T get() {
        try {
            barrier.await();
            return value;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void set(T value) {
        this.value = value;
        barrier.countDown();
    }
}


Final solution

All these remarks put together, I came up with the following solution. Note that BlockingHolder is an inner class because it doesn't have to be known in the outside world.

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public final class ObservingCache {
    private final BlockingHolder holder;
    private final ScheduledExecutorService executor;

    /**
     * The cache will be refreshed every 10 minutes
     */
    public ObservingCache(Supplier syncFunc) {
        this(syncFunc, 10 * 60 * 1000);
    }

    public ObservingCache(Supplier syncFunc, int refreshIntervalMillis) {
        this.holder = new BlockingHolder<>();
        this.executor = Executors.newScheduledThreadPool(1);
        executor.scheduleAtFixedRate(() -> holder.set(syncFunc.get()), 0, refreshIntervalMillis, TimeUnit.MILLISECONDS);
    }

    /**
     * Blocks until the cached value has been computed at least once
     */
    public T getItem() {
        return holder.get();
    }

    /**
     * Once called, the cached value won't be updated anymore.
     */
    public void stopObserving() {
        executor.shutdownNow();
    }

    private static class BlockingHolder {
        private T value;
        private final CountDownLatch barrier = new CountDownLatch(1);

        public T get() {
            try {
                barrier.await();
                return value;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        public void set(T value) {
            this.value = value;
            barrier.countDown();
        }
    }
}


Update: taking into account firstRunBlocking

This is how I would fully replace a boolean into a business class. It looks like overkill but in reality it's not. Each class has a single responsibility, is testable and maintainable.

ObservingCache is now abstract because the "blocking" behaviour can't be defined here but only in suclasses.

```
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public abstract class ObservingCache {
/**
* 10 minutes
*/
public static final int DEFAULT_CACHE_REFRESH_INTERVAL = 10 60 1000;

private final ScheduledExecutorService executor;

public ObservingCache(Supplier syncFunc, int refreshIntervalMillis) {
this.executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(() -> setItem(syncFunc.get()), 0, refreshIntervalMillis, TimeUnit.MILLISECONDS);
}

public abstract T getItem();

protected abstract void setItem(T value);

/**
* Once called, the cached value won't be updated anymore.
*/
public

Code Snippets

/**
 * Once called, the cached value won't be updated anymore.
 */
public void stopObserving() {
    executor.shutdownNow();
}
public class BlockingHolder<T> {
    private T value;
    private final CountDownLatch barrier = new CountDownLatch(1);

    public T get() {
        try {
            barrier.await();
            return value;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void set(T value) {
        this.value = value;
        barrier.countDown();
    }
}
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public final class ObservingCache<T> {
    private final BlockingHolder<T> holder;
    private final ScheduledExecutorService executor;

    /**
     * The cache will be refreshed every 10 minutes
     */
    public ObservingCache(Supplier<? extends T> syncFunc) {
        this(syncFunc, 10 * 60 * 1000);
    }

    public ObservingCache(Supplier<? extends T> syncFunc, int refreshIntervalMillis) {
        this.holder = new BlockingHolder<>();
        this.executor = Executors.newScheduledThreadPool(1);
        executor.scheduleAtFixedRate(() -> holder.set(syncFunc.get()), 0, refreshIntervalMillis, TimeUnit.MILLISECONDS);
    }

    /**
     * Blocks until the cached value has been computed at least once
     */
    public T getItem() {
        return holder.get();
    }

    /**
     * Once called, the cached value won't be updated anymore.
     */
    public void stopObserving() {
        executor.shutdownNow();
    }

    private static class BlockingHolder<T> {
        private T value;
        private final CountDownLatch barrier = new CountDownLatch(1);

        public T get() {
            try {
                barrier.await();
                return value;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        public void set(T value) {
            this.value = value;
            barrier.countDown();
        }
    }
}
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public abstract class ObservingCache<T> {
    /**
     * 10 minutes
     */
    public static final int DEFAULT_CACHE_REFRESH_INTERVAL = 10 * 60 * 1000;

    private final ScheduledExecutorService executor;

    public ObservingCache(Supplier<? extends T> syncFunc, int refreshIntervalMillis) {
        this.executor = Executors.newScheduledThreadPool(1);
        executor.scheduleAtFixedRate(() -> setItem(syncFunc.get()), 0, refreshIntervalMillis, TimeUnit.MILLISECONDS);
    }

    public abstract T getItem();

    protected abstract void setItem(T value);

    /**
     * Once called, the cached value won't be updated anymore.
     */
    public final void stopObserving() {
        executor.shutdownNow();
    }
}
import java.util.concurrent.CountDownLatch;
import java.util.function.Supplier;

public final class BlockingCache<T> extends ObservingCache<T> {
    private final BlockingHolder<T> holder;

    public BlockingCache(Supplier<? extends T> syncFunc, int refreshIntervalMillis) {
        super(syncFunc, refreshIntervalMillis);
        this.holder = new BlockingHolder<>();
    }

    /**
     * Blocks until the cached value has been computed at least once
     */
    @Override
    public T getItem() {
        return holder.get();
    }

    @Override
    protected void setItem(T value) {
        holder.set(value);
    }

    private static class BlockingHolder<T> {
        private T value;
        private final CountDownLatch barrier = new CountDownLatch(1);

        public T get() {
            try {
                barrier.await();
                return value;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        public void set(T value) {
            this.value = value;
            barrier.countDown();
        }
    }
}

Context

StackExchange Code Review Q#139629, answer score: 2

Revisions (0)

No revisions yet.