Observer 观察者设计模式是行为模式的一种,它的作用是当一个对象的状态发生变化时,能够自动通知其他关联对象,自动刷新对象状态
Observer 模式提供给关联对象一种同步通信的手段,使某个对象与依赖它的其他对象之间保持状态同步。如下用代码的形式来展现被观察者(新闻出版社)和观察者(与之关联的订户观察者对象)是如何保持信息同步的。
1 被观察者 - 新闻出版社
/**
* NewsPublisher: 新闻出版社
*/
class NewsPublisher extends Observable {
public void publishNews(String newsTitle, String newsBody) {
News news = new News(newsTitle, newsBody);
setChanged(); // 通过 setChanged() 方法标明对象的状态已发生变化
this.notifyObservers(news); // 通知各 Observer ,并发送一个名为 news 对象的消息
// ... ...
}
}
2 观察者 - 订户观察者
/**
* 订户观察者。
* Created by myuser on 2014/11/29.
*/
public class SubscriberObserver implements Observer {
// 新闻出版社调用 notifyObservers(news) 方法,自动调用如下方法以保持信息同步。
public void update(Observable observee, Object param) {
if (param instanceof News) {
mail2Subscriber((News)param);
}
}
private void mail2Subscriber(News news) {
System.out.println("Mail to subscriber. A news published with title:" + news.getTitle());
}
}
3 构造者
被观察者调用 notifyObservers() 方法后,为什么观察者就能接收到呢?那是因为有构造者这个角色,它将观察者添加到被观察者的依赖对象里面,代码如下:
public class Client {
/**
* Test Observer Pattern
*/
public static void main(String[] args) {
NewsPublisher publisher = new NewsPublisher();
// 添加订户观察者依赖对象
publisher.addObserver(new SubscriberObserver());
// 发布新闻,触发通知事件
publisher.publishNews("Hello news", "news body");
}
}
上面是一个简单的 Observer 观察者设计模式的实例。接下来看看大数据框架 hadoop 是如何应用该设计模式的。
应用场景如下: JobTracker 收到作业后,并不会马上对其初始化,而是交给调度器,由它按照一定的策略对作业进行初始化。
4 被观察者 -JobTracker
JobTracker 进行作业添加(执行 addJob() 方法)时,会同步该消息到对应的观察者( JobInProgressListener )那里,代码如下:
public class JobTracker {
private final List<JobInProgressListener> jobInProgressListeners =
new CopyOnWriteArrayList<JobInProgressListener>();
private synchronized JobStatus addJob(JobID jobId, JobInProgress
job) {
... ...
for (JobInProgressListener listener : jobInProgressListeners ) {
listener .jobAdded( job ); // 通知各 Observer ,并发送 job 消息
}
... ...
}
}
5 观察者 -JobQueueJobInProgressListener
class JobQueueJobInProgressListener extends JobInProgressListener {
private Map<JobSchedulingInfo, JobInProgress> jobQueue ;
@Override
public void jobAdded (JobInProgress job ) {
// 将作业添加到作业队列里。
jobQueue .put( new JobSchedulingInfo( job .getStatus()), job );
}
}
6 构造者
接下来看一下 JobTracker 和 JobQueueJobInProgressListener 的依赖关系是如何建立起来的。
先看一下 JobTracker 类的一些信息,代码如下所示:
public class JobTracker implements MRConstants, InterTrackerProtocol,
JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol,
RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,
JobTrackerMXBean {
// 该类实例化的时候,会从配置文件的属性 mapred.jobtracker.taskScheduler 获取调度器的类名,然后实例化一个调度器作为 JobTracker 的一个属性。代码如下所示:
JobTracker( final JobConf conf , String identifier , Clock clock , QueueManager qm ) {
... ...
Class<? extends TaskScheduler> schedulerClass
= conf .getClass( "mapred.jobtracker.taskScheduler" ,
JobQueueTaskScheduler. class , TaskScheduler. class );
taskScheduler = (TaskScheduler) ReflectionUtils. newInstance ( schedulerClass , conf );
... ...
}
// JobTracker 类被运行的时候会去调用 startTracker () 静态方法和 offerService() ,代码如下所示:
public static void main(String argv []
) throws IOException, InterruptedException {
... ...
JobTracker tracker = startTracker ( new JobConf());
tracker .offerService();
... ...
}
// startTracker () 静态方法首先实例一个 JobTracker 类,然后将当前实例赋给调度器的 t askTrackerManager 属性。
public static JobTracker startTracker (JobConf conf , String identifier )
throws IOException, InterruptedException {
... ...
result = new JobTracker( conf , identifier );
result . taskScheduler .setTaskTrackerManager( result );
... ...
return result ;
}
// offerService() 方法调用调度器的 start() 方法。
public void offerService() throws InterruptedException, IOException {
... ...
taskScheduler .start();
... ...
}
}
// 如下是调度器的 start() 方法,该方法将 JobQueueJobInProgressListener 类添加到 JobTracker 的依赖属性里,也即构造了 JobTracker 和 JobQueueJobInProgressListener 的被观察者与观察者关系。
class JobQueueTaskScheduler extends TaskScheduler {
public JobQueueTaskScheduler() {
this . jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
}
public synchronized void start() throws IOException {
... ...
taskTrackerManager . addJobInProgressListener ( jobQueueJobInProgressListe ner );
... ...
}
}
原文链接:http://seandeng888.iteye.com/blog/2161824