你好,游客 登录
背景:
阅读新闻

大数据框架hadoop之Observe设计模式应用

[日期:2014-12-01] 来源:ITeye-博客   作者: [字体: ]

Observer 观察者设计模式是行为模式的一种,它的作用是当一个对象的状态发生变化时,能够自动通知其他关联对象,自动刷新对象状态

Observer 模式提供给关联对象一种同步通信的手段,使某个对象与依赖它的其他对象之间保持状态同步。如下用代码的形式来展现被观察者(新闻出版社)和观察者(与之关联的订户观察者对象)是如何保持信息同步的。

1       被观察者 - 新闻出版社

/**

* NewsPublisher: 新闻出版社

*/

class NewsPublisher extends Observable {

public void publishNews(String newsTitle, String newsBody) {

News news = new News(newsTitle, newsBody);

setChanged();  // 通过 setChanged() 方法标明对象的状态已发生变化

this.notifyObservers(news);  // 通知各 Observer ,并发送一个名为 news 对象的消息

// ... ...

}

}

2       观察者 - 订户观察者

/**

* 订户观察者。

* Created by myuser on 2014/11/29.

*/

public class SubscriberObserver implements Observer {

// 新闻出版社调用 notifyObservers(news) 方法,自动调用如下方法以保持信息同步。

public void update(Observable observee, Object param) {

if (param instanceof News) {

mail2Subscriber((News)param);

}

}

private void mail2Subscriber(News news) {

System.out.println("Mail to subscriber. A news published with title:" + news.getTitle());

}

}

3       构造者

被观察者调用 notifyObservers() 方法后,为什么观察者就能接收到呢?那是因为有构造者这个角色,它将观察者添加到被观察者的依赖对象里面,代码如下:

public class Client {

/**

* Test Observer Pattern

*/

public static void main(String[] args) {

NewsPublisher publisher = new NewsPublisher();

// 添加订户观察者依赖对象

publisher.addObserver(new SubscriberObserver());

// 发布新闻,触发通知事件

publisher.publishNews("Hello news", "news body");

}

}

上面是一个简单的 Observer 观察者设计模式的实例。接下来看看大数据框架 hadoop 是如何应用该设计模式的。

应用场景如下: JobTracker 收到作业后,并不会马上对其初始化,而是交给调度器,由它按照一定的策略对作业进行初始化。

4       被观察者 -JobTracker

JobTracker 进行作业添加(执行 addJob() 方法)时,会同步该消息到对应的观察者( JobInProgressListener )那里,代码如下:

public class JobTracker {

  private final List<JobInProgressListener> jobInProgressListeners =

new CopyOnWriteArrayList<JobInProgressListener>();

  private synchronized JobStatus addJob(JobID jobId, JobInProgress

job) {

  ... ...

for (JobInProgressListener listener : jobInProgressListeners ) {

listener .jobAdded( job );   // 通知各 Observer ,并发送 job 消息

}

... ...

}

}

5       观察者 -JobQueueJobInProgressListener

class JobQueueJobInProgressListener extends JobInProgressListener {

  private Map<JobSchedulingInfo, JobInProgress> jobQueue ;

@Override

public void jobAdded (JobInProgress job ) {

// 将作业添加到作业队列里。

jobQueue .put( new JobSchedulingInfo( job .getStatus()), job );

}

}

6       构造者

接下来看一下 JobTracker JobQueueJobInProgressListener 的依赖关系是如何建立起来的。

先看一下 JobTracker 类的一些信息,代码如下所示:

public class JobTracker implements MRConstants, InterTrackerProtocol,

JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol,

RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,

JobTrackerMXBean {

// 该类实例化的时候,会从配置文件的属性 mapred.jobtracker.taskScheduler 获取调度器的类名,然后实例化一个调度器作为 JobTracker 的一个属性。代码如下所示:

JobTracker( final JobConf conf , String identifier , Clock clock , QueueManager qm ) {

... ...

Class<? extends TaskScheduler> schedulerClass

= conf .getClass( "mapred.jobtracker.taskScheduler" ,

JobQueueTaskScheduler. class , TaskScheduler. class );

taskScheduler = (TaskScheduler) ReflectionUtils. newInstance ( schedulerClass , conf );

... ...

}

// JobTracker 类被运行的时候会去调用 startTracker () 静态方法和 offerService() ,代码如下所示:

  public static void main(String argv []

) throws IOException, InterruptedException {

... ...

JobTracker tracker = startTracker ( new JobConf());

tracker .offerService();

... ...

}

// startTracker () 静态方法首先实例一个 JobTracker 类,然后将当前实例赋给调度器的 t askTrackerManager 属性。

public static JobTracker startTracker (JobConf conf , String identifier )

throws IOException, InterruptedException {

... ...

result = new JobTracker( conf , identifier );

result . taskScheduler .setTaskTrackerManager( result );

... ...

return result ;

}

// offerService() 方法调用调度器的 start() 方法。

  public void offerService() throws InterruptedException, IOException {

... ...

taskScheduler .start();

... ...

}

}

// 如下是调度器的 start() 方法,该方法将 JobQueueJobInProgressListener 类添加到 JobTracker 的依赖属性里,也即构造了 JobTracker JobQueueJobInProgressListener 的被观察者与观察者关系。

class JobQueueTaskScheduler extends TaskScheduler {

public JobQueueTaskScheduler() {

this . jobQueueJobInProgressListener = new JobQueueJobInProgressListener();

}

  public synchronized void start() throws IOException {

... ...

taskTrackerManager . addJobInProgressListener ( jobQueueJobInProgressListe ner );

... ...

}

}

原文链接:http://seandeng888.iteye.com/blog/2161824





收藏 推荐 打印 | 录入: | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款