1. 빅데이터 정의

데이터 규모 측면

  • 현재의 방식으로 저장, 관리, 분석할 수 있는 범위를 초과하는 데이터

업무 프로세스 측면

  • 저렴한 비용으로 대규모 데이터의 초고속 수집, 발굴, 융합, 분석과 가치 추출을 지원할 수 있도록 고안된 차세대 기술

빅데이터는 대용량 데이터가 아닌 다차원적으로 엄청난 데이터

  • 크기가 큰 것만이 아니라 관리와 분석의 종합적인 어려움 및 복잡성을 의미

일반적으로 3V = Volume + Variety + Velocity 측면으로 빅데이터를 정의

데이터 처리를 위해 필요한 인력/조직 및 기술까지 포함하는 넓은 의미로도 사용

2. 빅데이터 기술

Semantic Text Mining Engine

3. 빅데이터 S/W

3.1 빅데이터 수집 S/W

비정형 데이터 수집 도구

  • Nutch : 오픈소스 웹 검색 소프트웨어로 웹크롤로의 기능을 제공
  • Flume : 대량의 로그 데이터를 효과적으로 수집하거나 트위터 API를 호출
  • 검색API : 포털(네이버/다음/구글)의 검색 API를 이용 특정 주제의 데이터 수집

정형 데이터 수집 도구

  • Scoop: RDBMS와 HDFS 파일시스템에서 데이터를 주고 받는 기능 제공
  • DB Connector : 대상 DB의 연결 모듈을 개발하여 DB의 데이터를 수집

3.2 빅데이터 구축 S/W

빅데이터 처리 플랫폼

  • Hadoop : 대량의 데이터를 처리할 수 있는 클러스터 환경에서 동작하는 분산응용 프로그램을 지원하는 프레임워크 기능을 제공

빅데이터 저장소

  • HDFS : 수십 테라 또는 페타바이트 이상의 대용량 파일을 분산된 서버에 저장하고 수많은 클라이언트가 저장된 데이터를 빠르게 처리할 수 있게 설계된 파일 시스템
  • NoSQL : 빅데이터의 효과적 저장 및 관리에 필요한 기술, 카산드라(Cassandra DB), 몽고DB 등이 있음
  • Mysql : 효율적인 빅데이터 분석을 위하여 정규화된 데이터를 저장하는 RDBMS

3.3 빅데이터 분석 및 활용 S/W

빅데이터 분석 도구

  • Pro-SINDI : 대량의 데이터에서 단일 핵심어 뿐만 아니라 5개의 복합명사 까지 추출, 연관관계 추출도 가능한 분석 프로젝트에 최적화된 상용 소프트웨어 워드 클라우드, 파이/바/라인 차트 제공 및 온톨로지 연계 가능
  • Mahout : 대량의 데이터에서 용어(핵심어) 및 관계 추출과 분류 및 필터링에 사용되어지는 기계학습 기반의 알고리즘 및 데이터 분석용 도구
  • R : 빅데이터의 통계적 의미를 찾고 그 패턴을 분석하기 위해서 강력한 통계 기능 제공
  • 출처 : http://www.frotoma.com/sub2_9.do







Hadoop Ecosystem은 큰 데이터 문제를 해결하는 플랫폼 또는 프레임 워크입니다. 

내부적으로 여러 가지 서비스 (섭취, 저장, 분석 및 유지)를 포함하는 모음으로 간주 할 수 있습니다.



저장을 위해 우리는 HDFS (Hadoop Distributed Filesystem)를 사용합니다 . HDFS 의 주요 구성 요소 는 NameNode 와 DataNode 입니다.

NameNode

DataNode (슬레이브 노드)를 관리하고 관리하는 마스터 데몬입니다. 클러스터에 저장된 모든 파일의 메타 데이터, 즉 저장된 블록의 위치, 파일 크기, 사용 권한, 계층 구조 등을 기록합니다. 파일 시스템 메타 데이터에 발생하는 모든 변경 사항을 기록합니다.

예를 들어 HDFS에서 파일이 삭제되면 NameNode가 즉시 편집 로그에이 파일을 기록합니다. 클러스터의 모든 데이터 노드로부터 하트 비트 및 블록 보고서를 정기적으로 수신하여 DataNode가 라이브 상태인지 확인합니다. HDFS에있는 모든 블록의 기록을 유지하고 이들 블록이 저장된 노드를 기록합니다.

데이터 노드

이것들은 각 슬레이브 머신에서 실행되는 슬레이브 데몬입니다. 실제 데이터는 DataNodes에 저장됩니다. 이들은 클라이언트의 읽기 및 쓰기 요청을 담당합니다. 또한 NameNode의 결정에 따라 블록 작성, 블록 h 제 및 복제를 담당합니다.

처리를 위해 YARN (Yet Another Resource Negotiator)을 사용합니다. YARN 의 구성 요소 는 ResourceManager 및 NodeManager 입니다.

ResourceManager

이는 클러스터 레벨 (각 클러스터에 하나씩) 구성 요소이며 마스터 시스템에서 실행됩니다. YARN에서 실행되는 리소스를 관리하고 응용 프로그램을 예약합니다.

NodeManager

노드 레벨 구성 요소 (각 노드에 하나씩 있음)이며 각 슬레이브 시스템에서 실행됩니다. 컨테이너 관리 및 각 컨테이너의 리소스 사용률 모니터링을 담당합니다. 또한 노드 상태 및 로그 관리를 추적합니다. ResourceManager와 지속적으로 통신하여 최신 상태를 유지합니다.

따라서 MapReduce를 사용하여 HDFS에서 병렬 처리를 수행 할 수 있습니다.

MapReduce

Hadoop 생태계에서 처리의 핵심 구성 요소로서 처리 논리를 제공합니다. 즉, MapReduce는 Hadoop 환경에서 분산 및 병렬 알고리즘을 사용하여 대용량 데이터 세트를 처리하는 응용 프로그램을 작성하는 데 도움이되는 소프트웨어 프레임 워크입니다. MapReduce 프로그램에서 Map () 및 Reduce () 는 두 가지 함수입니다. Map 함수 는 필터링, 그룹화 및 정렬과 같은 작업을 수행합니다. Reduce 함수가지도 함수에의해 생성 된 결과를 집계하고 요약합니다. Map 함수에 의해 생성 된 결과는 a 키 값 쌍 (K, V)은 Reduce 함수의 입력으로 사용됩니다.

이 비디오를 통해 Hadoop 및 아키텍처를 자세히 이해할 수 있습니다.

" data-yt-id="n3qnsVFNEIU" style="margin-bottom: 1em; position: relative; width: 602px; height: 295px; background-repeat: no-repeat; background-position: center center; cursor: pointer; color: rgb(51, 51, 51); font-family: q_serif, Georgia, Times, "Times New Roman", "Hiragino Kaku Gothic Pro", Meiryo, serif; font-size: 16px; background-image: url("https://img.youtube.com/vi/n3qnsVFNEIU/0.jpg"); background-size: cover !important;">

Hadoop 단일 노드 및 다중 노드 클러스터 설치

그런 다음이 Hadoop 생태계 블로그 를 통해 Hadoop 생태계 를 자세히 학습 할 수 있습니다 .

이 Hadoop 에코 시스템 자습서 비디오를 살펴볼 수도 있습니다.

" data-yt-id="-XkEX1onpEI" style="margin-bottom: 1em; position: relative; width: 602px; height: 295px; background-repeat: no-repeat; background-position: center center; cursor: pointer; color: rgb(51, 51, 51); font-family: q_serif, Georgia, Times, "Times New Roman", "Hiragino Kaku Gothic Pro", Meiryo, serif; font-size: 16px; background-image: url("https://img.youtube.com/vi/-XkEX1onpEI/0.jpg"); background-size: cover !important;">

돼지

PIG는 실행 환경을위한 돼지 라틴어 , 언어 및 돼지 런타임 이라는 두 부분으로 구성 됩니다. Java 및 JVM으로 더 잘 이해할 수 있습니다. 돼지 라틴어를 지원합니다 .

모두가 프로그래밍 배경에 속하지 않으므로. 따라서 Apache PIG는이를 완화합니다. 너는 어떻게 알고 싶어 할지도 몰라?

음, 재미있는 사실을 말씀 드리겠습니다.

돼지 라틴 10 줄 = 약. 200 줄의 Map-Reduce Java 코드

그러나 돼지 작업의 뒷부분에서지도 축소 작업이 실행된다고 말할 때 충격을받지 마십시오. 컴파일러는 내부적으로 돼지 라틴을 MapReduce로 변환합니다. MapReduce 작업의 순차 집합을 생성하며 이는 추상화입니다 (검정 상자와 같이 작동 함). PIG는 처음에 Yahoo가 개발했습니다. ETL (Extract, Transform 및 Load), 거대한 데이터 세트 처리 및 분석을위한 데이터 흐름을 구축 할 수있는 플랫폼을 제공합니다.

" data-yt-id="GG-VRm6XnNk" style="margin-bottom: 1em; position: relative; width: 602px; height: 295px; background-repeat: no-repeat; background-position: center center; cursor: pointer; color: rgb(51, 51, 51); font-family: q_serif, Georgia, Times, "Times New Roman", "Hiragino Kaku Gothic Pro", Meiryo, serif; font-size: 16px; background-image: url("https://img.youtube.com/vi/GG-VRm6XnNk/0.jpg"); background-size: cover !important;">

하이브

Facebook은 SQL에 능통 한 사람들을 위해 HIVE를 만들었습니다. 따라서 HIVE는 하둡 생태계에서 일하는 동안 집에서 느끼게합니다. 기본적으로 HIVE는 SQL과 같은 인터페이스를 사용하여 분산 환경에서 대형 데이터 세트를 읽고, 쓰고 관리하는 데이터웨어 하우징 구성 요소입니다.

HIVE + SQL = HQL

Hive의 쿼리 언어는 SQL과 매우 유사한 HQL (Hive Query Language)이라고합니다. 하이브는 확장 성이 뛰어납니다. 즉, 대용량 데이터 집합 처리 (즉, 일괄 처리 쿼리 처리)와 실시간 처리 (즉, 대화 형 쿼리 처리)라는 두 가지 목적을 모두 수행 할 수 있습니다. Hive는 내부적으로 MapReduce 프로그램으로 변환됩니다.

SQL의 모든 기본 데이터 유형을 지원합니다. 사전 정의 된 기능을 사용하거나 사용자의 특정 요구를 충족시키기 위해 맞춤형 사용자 정의 함수 (UDF)를 작성할 수 있습니다.

" data-yt-id="tKNGB5IZPFE" style="margin-bottom: 1em; position: relative; width: 602px; height: 295px; background-repeat: no-repeat; background-position: center center; cursor: pointer; color: rgb(51, 51, 51); font-family: q_serif, Georgia, Times, "Times New Roman", "Hiragino Kaku Gothic Pro", Meiryo, serif; font-size: 16px; background-image: url("https://img.youtube.com/vi/tKNGB5IZPFE/0.jpg"); background-size: cover !important;">

요구 사항에 따라 HBase에 데이터를 저장할 수 있습니다.

HBase

HBase는 오픈 소스, 비 관계형 분산 데이터베이스입니다. 즉, NoSQL 데이터베이스입니다. 모든 유형의 데이터를 지원하므로 Hadoop 에코 시스템 내부의 모든 것을 처리 할 수 ​​있습니다. 대규모 데이터 세트를 처리하도록 설계된 분산 형 스토리지 시스템 인 Google의 BigTable을 모델로합니다.

HBase는 HDFS 위에서 실행되도록 설계되었으며 BigTable과 같은 기능을 제공합니다. 이는 대부분의 Big Data 사용 사례에서 일반적으로 나타나는 스파 스 데이터를 내결함성있게 저장하는 방법을 제공합니다. HBase는 Java로 작성되지만 HBase 응용 프로그램은 REST, Avro 및 Thrift API로 작성할 수 있습니다.

더 나은 이해를 위해 예를 들어 보겠습니다. 수십억의 고객 이메일을 보유하고 있으며 이메일에 불만 사항을 사용하는 고객의 수를 알아야합니다. 요청은 신속하게 (즉, 실시간으로) 처리되어야합니다. 여기에서는 적은 양의 데이터를 검색하는 동안 큰 데이터 세트를 처리합니다. 이러한 종류의 문제를 해결하기 위해 HBase가 설계되었습니다.

" data-yt-id="NOX6-nDtrFQ" style="margin-bottom: 1em; position: relative; width: 602px; height: 295px; background-repeat: no-repeat; background-position: center center; cursor: pointer; color: rgb(51, 51, 51); font-family: q_serif, Georgia, Times, "Times New Roman", "Hiragino Kaku Gothic Pro", Meiryo, serif; font-size: 16px; background-image: url("https://img.youtube.com/vi/NOX6-nDtrFQ/0.jpg"); background-size: cover !important;">

아파 토플

데이터 수집은 Hadoop 에코 시스템의 중요한 부분입니다. Flume은 비정형 및 반 구조화 된 데이터를 HDFS로 처리하는 데 도움이되는 서비스입니다. 이 솔루션은 신뢰할 수 있고 분산 된 솔루션을 제공 하며 많은 양의 데이터 세트 를 수집, 통합 및 이동 하는 데 도움이됩니다 네트워크 트래픽, 소셜 미디어, 이메일 메시지, 로그 파일 등과 같은 다양한 소스의 온라인 스트리밍 데이터를 HDFS로 가져 오는 데 도움이됩니다.

APACHE SQOOP

다른 데이터 수집 서비스 즉 Sqoop. Flume과 Sqoop의 주된 차이점은 Flume이 비정형 데이터 또는 반 구조화 된 데이터 만 HDFS로 가져 오는 것입니다. Sqoop은 RDBMS 또는 엔터프라이즈 데이터웨어 하우스에서 HDFS로 또는 그 반대로 가져온 구조화 된 데이터를 가져올 수 있습니다.

마지막으로,이 Hadoop 생태계 블로그와 Hadoop 생태계 비디오를 통해 당신에게 깊은 지식을 얻으시기 바랍니다.

" data-yt-id="-XkEX1onpEI" style="margin-bottom: 1em; position: relative; width: 602px; height: 295px; background-repeat: no-repeat; background-position: center center; cursor: pointer; color: rgb(51, 51, 51); font-family: q_serif, Georgia, Times, "Times New Roman", "Hiragino Kaku Gothic Pro", Meiryo, serif; font-size: 16px; background-image: url("https://img.youtube.com/vi/-XkEX1onpEI/0.jpg"); background-size: cover !important;">

Edureka는 Hadoop Tutorial 비디오의 좋은 목록을 제공합니다. 이 Hadoop 튜토리얼 비디오 재생 목록 과 Hadoop Tutorial 블로그 시리즈 를 살펴 보도록 권합니다 학습 내용은 Hadoop 인증에 부합해야합니다 .


Hadoop Ecosystem is a platform or framework which solves big data problems. You can consider it as a suite which encompasses a number of services (ingesting, storing, analyzing and maintaining) inside it.

For storage we use HDFS (Hadoop Distributed Filesystem).The main components of HDFS are NameNode and DataNode.

NameNode

It is the master daemon that maintains and manages the DataNodes (slave nodes). It records the metadata of all the files stored in the cluster, e.g. location of blocks stored, the size of the files, permissions, hierarchy, etc. It records each and every change that takes place to the file system metadata.

For example, if a file is deleted in HDFS, the NameNode will immediately record this in the EditLog. It regularly receives a Heartbeat and a block report from all the DataNodes in the cluster to ensure that the DataNodes are live. It keeps a record of all the blocks in HDFS and in which nodes these blocks are stored.

DataNode

These are slave daemons which runs on each slave machine. The actual data is stored on DataNodes. They are responsible for serving read and write requests from the clients. They are also responsible for creating blocks, deleting blocks and replicating the same based on the decisions taken by the NameNode.

For processing , we use YARN(Yet Another Resource Negotiator). The components of YARN are ResourceManager and NodeManager.

ResourceManager

It is a cluster level (one for each cluster) component and runs on the master machine. It manages resources and schedule applications running on top of YARN.

NodeManager

It is a node level component (one on each node) and runs on each slave machine. It is responsible for managing containers and monitoring resource utilization in each container. It also keeps track of node health and log management. It continuously communicates with ResourceManager to remain up-to-date .

So, you can perform parallel processing on HDFS using MapReduce.

MapReduce

It is the core component of processing in a Hadoop Ecosystem as it provides the logic of processing. In other words, MapReduce is a software framework which helps in writing applications that processes large data sets using distributed and parallel algorithms inside Hadoop environment. In a MapReduce program, Map() and Reduce() are two functions.The Map function performs actions like filtering, grouping and sorting.While Reduce function aggregates and summarizes the result produced by map function.The result generated by the Map function is a key value pair (K, V) which acts as the input for Reduce function.

You can go through this video to understand Hadoop & it’s architecture in detail.

" data-yt-id="n3qnsVFNEIU" style="margin-bottom: 1em; position: relative; width: 602px; height: 295px; background-repeat: no-repeat; background-position: center center; cursor: pointer; color: rgb(51, 51, 51); font-family: q_serif, Georgia, Times, "Times New Roman", "Hiragino Kaku Gothic Pro", Meiryo, serif; font-size: 16px; background-image: url("https://img.youtube.com/vi/n3qnsVFNEIU/0.jpg"); background-size: cover !important;">

Install Hadoop Single Node and Multi Node Cluster

Then you can go through this Hadoop Ecosystem blog to learn Hadoop Ecosystem in detail.

You can also go through this Hadoop Ecosystem tutorial video.

" data-yt-id="-XkEX1onpEI" style="margin-bottom: 1em; position: relative; width: 602px; height: 295px; background-repeat: no-repeat; background-position: center center; cursor: pointer; color: rgb(51, 51, 51); font-family: q_serif, Georgia, Times, "Times New Roman", "Hiragino Kaku Gothic Pro", Meiryo, serif; font-size: 16px; background-image: url("https://img.youtube.com/vi/-XkEX1onpEI/0.jpg"); background-size: cover !important;">

Pig

PIG has two parts: Pig Latin, the language and the pig runtime, for the execution environment. You can better understand it as Java and JVM. It supports pig latin language.

As everyone does not belong from a programming background. So, Apache PIG relieves them. You might be curious to know how?

Well, I will tell you an interesting fact:

10 line of pig latin = approx. 200 lines of Map-Reduce Java code

But don’t be shocked when I say that at the back end of Pig job, a map-reduce job executes. The compiler internally converts pig latin to MapReduce. It produces a sequential set of MapReduce jobs, and that’s an abstraction (which works like black box). PIG was initially developed by Yahoo. It gives you a platform for building data flow for ETL (Extract, Transform and Load), processing and analyzing huge data sets.

" data-yt-id="GG-VRm6XnNk" style="margin-bottom: 1em; position: relative; width: 602px; height: 295px; background-repeat: no-repeat; background-position: center center; cursor: pointer; color: rgb(51, 51, 51); font-family: q_serif, Georgia, Times, "Times New Roman", "Hiragino Kaku Gothic Pro", Meiryo, serif; font-size: 16px; background-image: url("https://img.youtube.com/vi/GG-VRm6XnNk/0.jpg"); background-size: cover !important;">

Hive

Facebook created HIVE for people who are fluent with SQL. Thus, HIVE makes them feel at home while working in a Hadoop Ecosystem. Basically, HIVE is a data warehousing component which performs reading, writing and managing large data sets in a distributed environment using SQL-like interface.

HIVE + SQL = HQL

The query language of Hive is called Hive Query Language(HQL), which is very similar like SQL. Hive is highly scalable. As, it can serve both the purposes, i.e. large data set processing (i.e. Batch query processing) and real time processing (i.e. Interactive query processing). Hive gets internally gets converted into MapReduce programs.

It supports all primitive data types of SQL. You can use predefined functions, or write tailored user defined functions (UDF) also to accomplish your specific needs.

" data-yt-id="tKNGB5IZPFE" style="margin-bottom: 1em; position: relative; width: 602px; height: 295px; background-repeat: no-repeat; background-position: center center; cursor: pointer; color: rgb(51, 51, 51); font-family: q_serif, Georgia, Times, "Times New Roman", "Hiragino Kaku Gothic Pro", Meiryo, serif; font-size: 16px; background-image: url("https://img.youtube.com/vi/tKNGB5IZPFE/0.jpg"); background-size: cover !important;">

You can store data in HBase based on your requirements.

HBase

HBase is an open source, non-relational distributed database. In other words, it is a NoSQL database. It supports all types of data and that is why, it’s capable of handling anything and everything inside a Hadoop ecosystem. It is modelled after Google’s BigTable, which is a distributed storage system designed to cope up with large data sets.

The HBase was designed to run on top of HDFS and provides BigTable like capabilities. It gives us a fault tolerant way of storing sparse data, which is common in most Big Data use cases. The HBase is written in Java, whereas HBase applications can be written in REST, Avro and Thrift APIs.

For better understanding, let us take an example. You have billions of customer emails and you need to find out the number of customers who has used the word complaint in their emails. The request needs to be processed quickly (i.e. at real time). So, here we are handling a large data set while retrieving a small amount of data. For solving these kind of problems, HBase was designed.

" data-yt-id="NOX6-nDtrFQ" style="margin-bottom: 1em; position: relative; width: 602px; height: 295px; background-repeat: no-repeat; background-position: center center; cursor: pointer; color: rgb(51, 51, 51); font-family: q_serif, Georgia, Times, "Times New Roman", "Hiragino Kaku Gothic Pro", Meiryo, serif; font-size: 16px; background-image: url("https://img.youtube.com/vi/NOX6-nDtrFQ/0.jpg"); background-size: cover !important;">

APACHE FLUME

Ingesting data is an important part of our Hadoop Ecosystem. The Flume is a service which helps in ingesting unstructured and semi-structured data into HDFS. It gives us a solution which is reliable and distributed and helps us in collecting, aggregating and moving large amount of data sets. It helps us to ingest online streaming data from various sources like network traffic, social media, email messages, log files etc. in HDFS.

APACHE SQOOP

Another data ingesting service i.e. Sqoop. The major difference between Flume and Sqoop is Flume only ingests unstructured data or semi-structured data into HDFS. While Sqoop can import as well as export structured data from RDBMS or Enterprise data warehouses to HDFS or vice versa.

At last, I would recommend you tu go throiugh this Hadoop ecosystem blog and Hadoop Ecosystem video to get a indepth knowledge.

" data-yt-id="-XkEX1onpEI" style="margin-bottom: 1em; position: relative; width: 602px; height: 295px; background-repeat: no-repeat; background-position: center center; cursor: pointer; color: rgb(51, 51, 51); font-family: q_serif, Georgia, Times, "Times New Roman", "Hiragino Kaku Gothic Pro", Meiryo, serif; font-size: 16px; background-image: url("https://img.youtube.com/vi/-XkEX1onpEI/0.jpg"); background-size: cover !important;">

Edureka provides a good list of Hadoop Tutorial videos. I would recommend you to go through this Hadoop tutorial video playlist as well as Hadoop Tutorial blog series. Your learning should be aligned with Hadoop Certification.



출처 : https://www.quora.com/What-is-a-Hadoop-ecosystem

■ 참조 Hadoop File System Shell Guide.


■ appendToFile : 로컬 시스템의 파일을 기존에 존재하는 hdfs 파일시스템의 파일에 붙인다.

bash$ hadoop fs -appendToFile [로컬 시스템의 파일] [HDFS 파일시스템 디렉토리] /[파일명]

1
2
3
4
5
6
7
8
9
bash$ hadoop fs -touchz /data/test/sample.txt
bash$ hadoop fs -ls -R /data/test
-rw-r--r--   3          0 /data/test/sample.txt
bash$ hadoop fs -usage appendToFile
Usage: hadoop fs [generic options] -appendToFile <localsrc> ... <dst>
bash$ hadoop fs -appendToFile license.txt /data/test/sample.txt
[nextman@centos01 script]$ hadoop fs -ls /data/test
Found 1 items
-rw-r--r--   3      15459 /data/test/sample.txt

 

■ cat : [파일명] 의 내용을 출력한다.
bash$ hadoop fs -cat /data/[파일명]
 

■ chgrp : 파일의 소유 그룹을 변경한다. ‘-R’ 옵션을 추가하면 디렉토리 하위의 모든 디렉토리와 파일의 소유 그룹을 변경한다.
bash$ hadoop fs -chgrp [변경할 그룹] [변경할 디렉토리 혹은 파일]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bash$ hadoop fs -ls /data/test
Found 2 items
-rw-r--r--   3 nextman supergroup /data/test/sample.log
-rw-r--r--   3 nextman supergroup /data/test/sample.txt
bash$ hadoop fs -chgrp root /data/test/sample.log
bash$ hadoop fs -ls /data/test
Found 2 items
-rw-r--r--   3 nextman root       /data/test/sample.log
-rw-r--r--   3 nextman supergroup /data/test/sample.txt
bash$ hadoop fs -chgrp root /data/test/*
bash$ hadoop fs -ls /data/test
Found 2 items
-rw-r--r--   3 nextman root /data/test/sample.log
-rw-r--r--   3 nextman root /data/test/sample.txt
bash$ hadoop fs -chgrp supergroup /data/test/*
bash$ hadoop fs -ls /data/test
Found 2 items
-rw-r--r--   3 nextman supergroup /data/test/sample.log
-rw-r--r--   3 nextman supergroup /data/test/sample.txt

 

■ chmod : 파일의 모드(drwxrwxrwx)를 변경한다. ‘-R’ 옵션을 사용하면 하위 디렉토리에도 같이 반영된다.
bash$ hadoop fs -chmod [MODE CHANGE RULE] [hdfs 파일시스템 디렉토리 혹은 파일명]

1
2
3
4
5
6
7
8
9
10
11
bash$ hadoop fs -ls /data/test/sample.log
-r--r--r--   3 913785 /data/test/sample.log
bash$ hadoop fs -chmod +w /data/test/sample.log
bash$ hadoop fs -ls /data/test/sample.log
-rw-rw-rw-   3 913785  /data/test/sample.log
bash$ hadoop fs -chmod o-w /data/test/sample.log
bash$ hadoop fs -ls /data/test/sample.log
-rw-rw-r--   3 913785 /data/test/sample.log
bash$ hadoop fs -chmod 755 /data/test/sample.log
bash$ hadoop fs -ls /data/test/sample.log
-rwxr-xr-x   913785 /data/test/sample.log

 

■ chown : 파일의 소유권을 변경한다. ‘-R’ 옵션을 사용하면 하위 디렉토리에도 같이 반영된다.
bash$ hadoop fs -chown [이전할 소유자] [hdfs 파일시스템 디렉토리 혹은 파일명]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ hadoop fs -chown root /data/test/sample.log
bash$ hadoop fs -ls /data/test/sample.log
-rwxr-xr-x   3 root root     913785 2016-03-13 17:57 /data/test/sample.log
bash$ hadoop fs -ls /data/test
Found 2 items
-rwxr-xr-x   3 root    root /data/test/sample.log
-rw-r--r--   3 nextman root /data/test/sample.txt
bash$ hadoop fs -ls -R /data/test
-rwxr-xr-x   3 root    root /data/test/sample.log
-rw-r--r--   3 nextman root /data/test/sample.txt
bash$ hadoop fs -ls -R /data
drwxr-xr-x   - nextman root /data/test
-rwxr-xr-x   3 root    root /data/test/sample.log
-rw-r--r--   3 nextman root /data/test/sample.txt
drwxr-xr-x   - nextman root /data/wiki
drwxr-xr-x   - nextman root /data/wiki/input
drwxr-xr-x   - nextman root /data/wordcount
drwxr-xr-x   - nextman root /data/wordcount/input
-rw-r--r--   3 nextman root /data/wordcount/input/sample.txt
drwxr-xr-x   - nextman root /data/wordcount/output

 

■ copyFromLocal : 로컬 파일시스템에서 HDFS 파일 시스템으로 파일 복사(-copyFromLocal), /home/nextman/sample.txt : 로컬 파일 시스템의 디렉토리, /data/wordcount/input : HDFS 파일 시스템의 디렉토리.
bash$ hadoop fs -copyFromLocal [로컬파일시스템 디렉토리]/[파일명] [HDFS 파일시스템 디렉토리]/
bash$ hadoop fs -copyFromLocal /home/nextman/sample.txt /data/wordcount/input
 

■ copyToLocal : HDFS 파일시스템에서 로컬 파일 시스템으로 파일 복사(-copyToLocal), /data/wordcount/input/sample.txt : HDFS 파일 시스템의 파일, /home/nextman/tmp : 로컬 시스템의 파일 디렉토리.
bash$ hadoop fs -copyToLocal [HDFS 파일시스템 디렉토리] /[파일명] [로컬파일시스템 디렉토리] /
bash$ hadoop fs -copyToLocal /data/wordcount/input/sample.txt /home/nextman/tmp
 

■ cp : HDFS 파일시스템내에서 파일을 복사한다. -f 옵션은 목적지 디렉토리에 같은 파일이 있을 경우 덮어 쓴다.
bash$ hadoop fs -cp [HDFS 파일시스템 파일] [HDFS 파일시스템 디렉토리]

 

■ df : 디렉토리의 여유 저장 공간을 알려준다. -h 옵션을 추가하면 사람이 읽기 쉽게 보여준다.
bash$ hadoop fs -df [HDFS 파일시스템 디렉토리]
bash$ hadoop fs -df -h [HDFS 파일시스템 디렉토리]

1
2
3
4
5
6
bash$ hadoop fs -df /data
Filesystem                    Size     Used     Available  Use%
hdfs://centos01:9000  701299851264  6529024  662186717184    0%
bash$ hadoop fs -df -h /data
Filesystem               Size   Used  Available  Use%
hdfs://centos01:9000  653.1 G  6.2 M    616.7 G    0%

 

■ du : 지정한 디렉토리의 하위 디렉토리가 사용하고 있는 데이터의 크기를 알려준다. -h 옵션을 추가하면 사람이 읽기 쉽게 보여주고 -s 옵션은 그 디렉토리 자체(자식포함)의 디스크 사용량을 알려준다.
bash$ hadoop fs -du [HDFS 파일시스템 디렉토리]
bash$ hadoop fs -du -h [HDFS 파일시스템 디렉토리]
bash$ hadoop fs -du -s [HDFS 파일시스템 디렉토리]

1
2
3
4
5
6
7
8
9
10
$ hadoop fs -du /data
1858488  /data/test
0        /data/wiki
47898    /data/wordcount
bash$ hadoop fs -du -s /data
1906386  /data
bash$ hadoop fs -du -h /data
1.8 M   /data/test
0       /data/wiki
46.8 K  /data/wordcount

 

■ dus : 이 명령어는 deprecated되었다. 대신 du -s 명령어를 사용하면 된다.
 

■ get : hdfs 파일시스템의 파일을 로컬 파일시스템으로 복사한다. 아래의 예에서는 hdfs 파일시스템의 ‘/data/test’디렉토리의 ‘sample.txt’ 파일을 로컬 파일시스템의 ‘/home/nextman’ 디렉토리에 ‘xx.txt’라는 파일명으로 복사한다.
bash$ hadoop fs -get [hdfs 파일시스템의 파일] [로컬 파일시스템의 파일]

1
$ hadoop fs -get /data/test/sample.txt /home/nextman/xx.txt

 

■ help : hdfs 파일 시스템 관련 도움말을 확인한다
bash$ hadoop fs -help
 

■ ls -R : hdfs 파일 시스템의 ‘/data’ 디렉토리의 파일 리스트를 조회한다. 2번째처럼 ‘-R’ 옵션을 추가하면 하위 디렉토리를 포함한 전체 파일 리스트를 확인할 수 있다.
bash$ hadoop fs -ls /data
bash$ hadoop fs -ls -R /data
 

■ mkdir : ‘/'(root) 디렉토리 하위에 ‘data’ 디렉토리를 생성한다.
bash$ hadoop fs -mkdir /data
 

■ moveFromLocal : put 명령어와 유사하며 로컬 시스템의 파일이 hdfs파일 시스템으로 복사가 된 후 로컬 시스템의 파일은 삭제된다.
bash$ hadoop fs -moveFromLocal [로컬 파일시스템의 파일] [hdfs 파일시스템의 디렉토리]
 

■ moveToLocal : 아직 구현이 되지 않았다.(not implemented)

1
2
$ hadoop fs -moveToLocal /data/test/sample.txt
moveToLocal: Option '-moveToLocal' is not implemented yet.

 

■ mv : hdfs 파일 시스템내에서 디렉토리 혹은 파일(들)을 이동한다.
bash$ hadoop fs -mv [hdfs 파일시스템의 디렉토리 혹은 파일] [hdfs 파일시스템의 디렉토리]

1
2
3
4
5
6
7
8
9
10
11
12
13
bash$ hadoop fs -ls -R /data
drwxr-xr-x   -          0 2016-03-13 17:57 /data/test
-rw-r--r--   3     913785 2016-03-13 17:57 /data/test/sample.log
-rw-r--r--   3     944703 2016-03-13 17:14 /data/test/sample.txt
drwxr-xr-x   -          0 2016-03-09 07:40 /data/wiki
drwxr-xr-x   -          0 2016-03-09 07:40 /data/wiki/input
bash$ hadoop fs -mv /data/wiki /data/test
bash$ hadoop fs -ls -R /data
drwxr-xr-x   -          0 2016-03-13 18:17 /data/test
-rw-r--r--   3     913785 2016-03-13 17:57 /data/test/sample.log
-rw-r--r--   3     944703 2016-03-13 17:14 /data/test/sample.txt
drwxr-xr-x   -          0 2016-03-09 07:40 /data/test/wiki
drwxr-xr-x   -          0 2016-03-09 07:40 /data/test/wiki/input

 

■ put : 로컬 파일시스템의 파일을 hdfs 파일시스템으로 복사한다. copyFromLocal 명령어와 유사하다.
bash$ hadoop fs -put [로컬시스템의 파일] [hdfs 파일시스템의 디렉토리]

1
hadoop fs -put sample.log /data/test

 

■ rm : [파일명] 을 삭제한다.
bash$ hadoop fs -rm /data/[파일명]
 

■ rm -r : 디렉토리 삭제 명령어. -r 옵션은 삭제하려는 디렉토리에 파일이 있더라도 삭제한다.
bash$ hadoop fs -rm -r /home/data/hadoop/dfs/data/wordcount2
 

■ rmdir : ‘data’ 디렉토리를 삭제한다. 만약 삭제하려는 ‘data’디렉토리 하위에 디렉토리나 파일이 있으면 ‘data’디렉토리를 삭제하지 않는다.
bash$ hadoop fs -rmdir /data
 

■ rmr : 이 명령어는 deprecated 되었다. 대신 rm -r 명령어를 사용하자.
 

■ tail : 파일의 마지막 1 kilobyte를 stdout(표준출력, 콘솔)에 표시한다. ‘-f’ 옵션은 파일이 계속 증가하는 경우에 확인하기 편리하다.
bash$ hadoop fs -tail [HDFS시스템 파일명]
bash$ hadoop fs -tail -f [HDFS시스템 파일명]

1
bash$ hadoop fs -tail -f /data/test/sample.txt

 

■ touchz : 파일 크기가 0인 파일을 생성한다.
bash$ hadoop fs -touchz [생성할 HDFS시스템 파일명]
bash$ hadoop fs -touchz /data/test/sample.txt

1
2
3
bash$ hadoop fs -touchz /data/test/sample.txt
bash$ hadoop fs -ls /data/test/sample.txt
-rw-r--r--   3          0 /data/test/sample.txt

 

■ usage : hadoop fs의 각 명령어에 대한 간단한 사용법을 알 수 있다. 아래는 createSnapshot에 대한 사용법을 확인하고 있다.
bash$ hadoop fs -usage [명령어]

1
2
bash$ hadoop fs -usage createSnapshot
Usage: hadoop fs [generic options] -createSnapshot <snapshotDir> [<snapshotName>]

 
 
 

checksum
count
createSnapshot
deleteSnapshot
expunge

find
getfacl
getfattr
getmerge
renameSnapshot

setfacl
setfattr
setrep
stat
test

text
truncate



출처 : http://blog.iotinfra.net/?p=12

오픈 소스 프레임워크를 활용한 검색엔진 구현



HOJBC0_2015_v19n3_552.pdf



출처 : 송현옥 저술 - ‎2015 - ‎관련 학술자료

'빅데이터 > Hadoop' 카테고리의 다른 글

Hadoop Ecosystem  (0) 2018.08.06
Hadoop fs 명령어 정리  (0) 2018.08.06
hadoop 자주쓰는 명령어 / Wordcount.java / wc.jar 파일  (0) 2018.08.02
Hadoop WordCount v1.0 wc.jar  (0) 2018.08.02
Hadoop 설치 / Hadoop 2.5.2 설치  (0) 2018.08.02



WordCount$IntSumReducer.class

WordCount$TokenizerMapper.class

WordCount.class

WordCount.java

wc.jar

정상적인 시스템 구성인 상태








~/.bashrc PATH 설정및 단축 alias



export HADOOP_HOME=/root/hadoop

export PATH=$PATH:$HADOOP_HOME/bin


unalias fs &> /dev/null

alias fs="hadoop fs"

unalias hls &> /dev/null

alias hls="fs -ls"








네임노트 포멧

bin / hdfs namenode -format


하둡 디렉토리 생성 (생성시 외부디렉토리부터생성) ** output 이 존재할경우 리듀스 과정 진행하지않음.



hdfs dfs -mkdir /wordcount/input/



디렉토리 삭제



hdfs dfs -rmr /wordcount/input/




분석할 파일 업로드


hadoop fs -put file* /wordcount/input



분석할 대상 파일 보는법 

hdfs dfs -cat /wordcount/input/file01





하둡 jar 통한 분석 시작

bin/hadoop jar wc.jar WordCount /wordcount/input /wordcount/output




분석완료된 파일 보는법

bin/hdfs dfs -cat /wordcount/output/part-r-00000





하둡에서 로컬로 분석파일 꺼내기

hadoop fs -get /wordcount/output/* 



'빅데이터 > Hadoop' 카테고리의 다른 글

Hadoop Ecosystem  (0) 2018.08.06
Hadoop fs 명령어 정리  (0) 2018.08.06
오픈 소스 프레임워크를 활용한 검색엔진 구현  (0) 2018.08.03
Hadoop WordCount v1.0 wc.jar  (0) 2018.08.02
Hadoop 설치 / Hadoop 2.5.2 설치  (0) 2018.08.02

WordCount$IntSumReducer.class

WordCount$TokenizerMapper.class

WordCount.class

WordCount.java

wc.jar


MapReduce Tutorial

Purpose

This document comprehensively describes all user-facing facets of the Hadoop MapReduce framework and serves as a tutorial.

Prerequisites

Ensure that Hadoop is installed, configured and is running. More details:

Overview

Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.

A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System (see HDFS Architecture Guide) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster.

The MapReduce framework consists of a single master ResourceManager, one slave NodeManager per cluster-node, and MRAppMaster per application (see YARN Architecture Guide).

Minimally, applications specify the input/output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration.

The Hadoop job client then submits the job (jar/executable etc.) and configuration to the ResourceManager which then assumes the responsibility of distributing the software/configuration to the slaves, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.

Although the Hadoop framework is implemented in Java™, MapReduce applications need not be written in Java.

  • Hadoop Streaming is a utility which allows users to create and run jobs with any executables (e.g. shell utilities) as the mapper and/or the reducer.
  • Hadoop Pipes is a SWIG-compatible C++ API to implement MapReduce applications (non JNI™ based).

Inputs and Outputs

The MapReduce framework operates exclusively on <key, value> pairs, that is, the framework views the input to the job as a set of <key, value> pairs and produces a set of <key, value> pairs as the output of the job, conceivably of different types.

The key and value classes have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement the WritableComparable interface to facilitate sorting by the framework.

Input and Output types of a MapReduce job:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

Example: WordCount v1.0

Before we jump into the details, lets walk through an example MapReduce application to get a flavour for how they work.

WordCount is a simple application that counts the number of occurrences of each word in a given input set.

This works with a local-standalone, pseudo-distributed or fully-distributed Hadoop installation ( Single Node Setup).

Source Code

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Usage

Assuming environment variables are set as follows:

export JAVA_HOME=/usr/java/default
export PATH=$JAVA_HOME/bin:$PATH
export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar

Compile WordCount.java and create a jar:

$ bin/hadoop com.sun.tools.javac.Main WordCount.java 
$ jar cf wc.jar WordCount*.class

Assuming that:

  • /user/joe/wordcount/input - input directory in HDFS
  • /user/joe/wordcount/output - output directory in HDFS

Sample text-files as input:

$ bin/hdfs dfs -ls /user/joe/wordcount/input/ 
/user/joe/wordcount/input/file01 
/user/joe/wordcount/input/file02

$ bin/hdfs dfs -cat /user/joe/wordcount/input/file01 
Hello World Bye World

$ bin/hdfs dfs -cat /user/joe/wordcount/input/file02 
Hello Hadoop Goodbye Hadoop

Run the application:

$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output

Output:

$ bin/hdfs dfs -cat /user/joe/wordcount/output/part-r-00000

Bye 1 
Goodbye 1 
Hadoop 2 
Hello 2 
World 2

Applications can specify a comma separated list of paths which would be present in the current working directory of the task using the option -files. The -libjars option allows applications to add jars to the classpaths of the maps and reduces. The option -archives allows them to pass comma separated list of archives as arguments. These archives are unarchived and a link with name of the archive is created in the current working directory of tasks. More details about the command line options are available at Commands Guide.

Running wordcount example with -libjars-files and -archives
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output Here, myarchive.zip will be placed and unzipped into a directory by the name "myarchive.zip".

Users can specify a different symbolic name for files and archives passed through -files and -archives option, using #.

For example, bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output Here, the files dir1/dict.txt and dir2/dict.txt can be accessed by tasks using the symbolic names dict1 and dict2 respectively. The archive mytar.tgz will be placed and unarchived into a directory by the name "tgzdir".

Walk-through

The WordCount application is quite straight-forward.

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }

The Mapper implementation, via the map method, processes one line at a time, as provided by the specified TextInputFormat. It then splits the line into tokens separated by whitespaces, via the StringTokenizer, and emits a key-value pair of < <word>, 1>.

For the given sample input the first map emits: 
< Hello, 1> 
< World, 1> 
< Bye, 1> 
< World, 1>

The second map emits: 
< Hello, 1> 
< Hadoop, 1> 
< Goodbye, 1> 
< Hadoop, 1>

We'll learn more about the number of maps spawned for a given job, and how to control them in a fine-grained manner, a bit later in the tutorial.

    job.setCombinerClass(IntSumReducer.class);

WordCount also specifies a combiner. Hence, the output of each map is passed through the local combiner (which is same as the Reducer as per the job configuration) for local aggregation, after being sorted on the keys.

The output of the first map: 
< Bye, 1> 
< Hello, 1> 
< World, 2>

The output of the second map: 
< Goodbye, 1> 
< Hadoop, 2> 
< Hello, 1>

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }

The Reducer implementation, via the reduce method just sums up the values, which are the occurence counts for each key (i.e. words in this example).

Thus the output of the job is: 
< Bye, 1> 
< Goodbye, 1> 
< Hadoop, 2> 
< Hello, 2> 
< World, 2>

The main method specifies various facets of the job, such as the input/output paths (passed via the command line), key/value types, input/output formats etc., in the Job. It then calls the job.waitForCompletion to submit the job and monitor its progress.

We'll learn more about JobInputFormatOutputFormat and other interfaces and classes a bit later in the tutorial.

MapReduce - User Interfaces

This section provides a reasonable amount of detail on every user-facing aspect of the MapReduce framework. This should help users implement, configure and tune their jobs in a fine-grained manner. However, please note that the javadoc for each class/interface remains the most comprehensive documentation available; this is only meant to be a tutorial.

Let us first take the Mapper and Reducer interfaces. Applications typically implement them to provide the map and reduce methods.

We will then discuss other core interfaces including JobPartitionerInputFormatOutputFormat, and others.

Finally, we will wrap up by discussing some useful features of the framework such as the DistributedCacheIsolationRunner etc.

Payload

Applications typically implement the Mapper and Reducer interfaces to provide the map and reduce methods. These form the core of the job.

Mapper

Mapper maps input key/value pairs to a set of intermediate key/value pairs.

Maps are the individual tasks that transform input records into intermediate records. The transformed intermediate records do not need to be of the same type as the input records. A given input pair may map to zero or many output pairs.

The Hadoop MapReduce framework spawns one map task for each InputSplit generated by the InputFormat for the job.

Overall, Mapper implementations are passed the Job for the job via the Job.setMapperClass(Class) method. The framework then calls map(WritableComparable, Writable, Context) for each key/value pair in the InputSplit for that task. Applications can then override the cleanup(Context) method to perform any required cleanup.

Output pairs do not need to be of the same types as input pairs. A given input pair may map to zero or many output pairs. Output pairs are collected with calls to context.write(WritableComparable, Writable).

Applications can use the Counter to report its statistics.

All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to the Reducer(s) to determine the final output. Users can control the grouping by specifying a Comparator via Job.setGroupingComparatorClass(Class).

The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.

Users can optionally specify a combiner, via Job.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.

The intermediate, sorted outputs are always stored in a simple (key-len, key, value-len, value) format. Applications can control if, and how, the intermediate outputs are to be compressed and the CompressionCodec to be used via the Configuration.

How Many Maps?

The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.

The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.

Thus, if you expect 10TB of input data and have a blocksize of 128MB, you'll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.

Reducer

Reducer reduces a set of intermediate values which share a key to a smaller set of values.

The number of reduces for the job is set by the user via Job.setNumReduceTasks(int).

Overall, Reducer implementations are passed the Job for the job via the Job.setReducerClass(Class) method and can override it to initialize themselves. The framework then calls reduce(WritableComparable, Iterable<Writable>, Context) method for each <key, (list of values)> pair in the grouped inputs. Applications can then override the cleanup(Context) method to perform any required cleanup.

Reducer has 3 primary phases: shuffle, sort and reduce.

Shuffle

Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.

Sort

The framework groups Reducer inputs by keys (since different mappers may have output the same key) in this stage.

The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged.

Secondary Sort

If equivalence rules for grouping the intermediate keys are required to be different from those for grouping keys before reduction, then one may specify a Comparator via Job.setSortComparatorClass(Class). Since Job.setGroupingComparatorClass(Class) can be used to control how intermediate keys are grouped, these can be used in conjunction to simulate secondary sort on values.

Reduce

In this phase the reduce(WritableComparable, Iterable<Writable>, Context) method is called for each <key, (list of values)> pair in the grouped inputs.

The output of the reduce task is typically written to the FileSystem via Context.write(WritableComparable, Writable).

Applications can use the Counter to report its statistics.

The output of the Reducer is not sorted.

How Many Reduces?

The right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).

With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.

Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

The scaling factors above are slightly less than whole numbers to reserve a few reduce slots in the framework for speculative-tasks and failed tasks.

Reducer NONE

It is legal to set the number of reduce-tasks to zero if no reduction is desired.

In this case the outputs of the map-tasks go directly to the FileSystem, into the output path set by FileOutputFormat.setOutputPath(Job, Path). The framework does not sort the map-outputs before writing them out to the FileSystem.

Partitioner

Partitioner partitions the key space.

Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.

HashPartitioner is the default Partitioner.

Counter

Counter is a facility for MapReduce applications to report its statistics.

Mapper and Reducer implementations can use the Counter to report statistics.

Hadoop MapReduce comes bundled with a library of generally useful mappers, reducers, and partitioners.

Job Configuration

Job represents a MapReduce job configuration.

Job is the primary interface for a user to describe a MapReduce job to the Hadoop framework for execution. The framework tries to faithfully execute the job as described by Job, however:

Job is typically used to specify the Mapper, combiner (if any), PartitionerReducerInputFormatOutputFormat implementations. FileInputFormat indicates the set of input files ( FileInputFormat.setInputPaths(Job, Path...)FileInputFormat.addInputPath(Job, Path)) and (FileInputFormat.setInputPaths(Job, String...)FileInputFormat.addInputPaths(Job, String)) and where the output files should be written ( FileOutputFormat.setOutputPath(Path)).

Optionally, Job is used to specify other advanced facets of the job such as the Comparator to be used, files to be put in the DistributedCache, whether intermediate and/or job outputs are to be compressed (and how), whether job tasks can be executed in a speculative manner ( setMapSpeculativeExecution(boolean))/ setReduceSpeculativeExecution(boolean)), maximum number of attempts per task (setMaxMapAttempts(int)setMaxReduceAttempts(int)) etc.

Of course, users can use Configuration.set(String, String)Configuration.get(String) to set/get arbitrary parameters needed by applications. However, use the DistributedCache for large amounts of (read-only) data.

Task Execution & Environment

The MRAppMaster executes the Mapper/Reducer task as a child process in a separate jvm.

The child-task inherits the environment of the parent MRAppMaster. The user can specify additional options to the child-jvm via the mapreduce.{map|reduce}.java.opts and configuration parameter in the Job such as non-standard paths for the run-time linker to search shared libraries via -Djava.library.path=<> etc. If the mapreduce.{map|reduce}.java.opts parameters contains the symbol @taskid@ it is interpolated with value of taskid of the MapReduce task.

Here is an example with multiple arguments and substitutions, showing jvm GC logging, and start of a passwordless JVM JMX agent so that it can connect with jconsole and the likes to watch child memory, threads and get thread dumps. It also sets the maximum heap-size of the map and reduce child jvm to 512MB & 1024MB respectively. It also adds an additional path to the java.library.path of the child-jvm.

<property>
  <name>mapreduce.map.java.opts</name>
  <value>
    -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
    -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

<property>
  <name>mapreduce.reduce.java.opts</name>
  <value>
    -Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
    -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>
Memory Management

Users/admins can also specify the maximum virtual memory of the launched child-task, and any sub-process it launches recursively, using mapreduce.{map|reduce}.memory.mb. Note that the value set here is a per process limit. The value for mapreduce.{map|reduce}.memory.mbshould be specified in mega bytes (MB). And also the value must be greater than or equal to the -Xmx passed to JavaVM, else the VM might not start.

Note: mapreduce.{map|reduce}.java.opts are used only for configuring the launched child tasks from MRAppMaster. Configuring the memory options for daemons is documented in Configuring the Environment of the Hadoop Daemons.

The memory available to some parts of the framework is also configurable. In map and reduce tasks, performance may be influenced by adjusting parameters influencing the concurrency of operations and the frequency with which data will hit disk. Monitoring the filesystem counters for a job- particularly relative to byte counts from the map and into the reduce- is invaluable to the tuning of these parameters.

Map Parameters

A record emitted from a map will be serialized into a buffer and metadata will be stored into accounting buffers. As described in the following options, when either the serialization buffer or the metadata exceed a threshold, the contents of the buffers will be sorted and written to disk in the background while the map continues to output records. If either buffer fills completely while the spill is in progress, the map thread will block. When the map is finished, any remaining records are written to disk and all on-disk segments are merged into a single file. Minimizing the number of spills to disk can decrease map time, but a larger buffer also decreases the memory available to the mapper.

NameTypeDescription
mapreduce.task.io.sort.mbintThe cumulative size of the serialization and accounting buffers storing records emitted from the map, in megabytes.
mapreduce.map.sort.spill.percentfloatThe soft limit in the serialization buffer. Once reached, a thread will begin to spill the contents to disk in the background.

Other notes

  • If either spill threshold is exceeded while a spill is in progress, collection will continue until the spill is finished. For example, if mapreduce.map.sort.spill.percent is set to 0.33, and the remainder of the buffer is filled while the spill runs, the next spill will include all the collected records, or 0.66 of the buffer, and will not generate additional spills. In other words, the thresholds are defining triggers, not blocking.
  • A record larger than the serialization buffer will first trigger a spill, then be spilled to a separate file. It is undefined whether or not this record will first pass through the combiner.
Shuffle/Reduce Parameters

As described previously, each reduce fetches the output assigned to it by the Partitioner via HTTP into memory and periodically merges these outputs to disk. If intermediate compression of map outputs is turned on, each output is decompressed into memory. The following options affect the frequency of these merges to disk prior to the reduce and the memory allocated to map output during the reduce.

NameTypeDescription
mapreduce.task.io.soft.factorintSpecifies the number of segments on disk to be merged at the same time. It limits the number of open files and compression codecs during merge. If the number of files exceeds this limit, the merge will proceed in several passes. Though this limit also applies to the map, most jobs should be configured so that hitting this limit is unlikely there.
mapreduce.reduce.merge.inmem.thresholdsintThe number of sorted map outputs fetched into memory before being merged to disk. Like the spill thresholds in the preceding note, this is not defining a unit of partition, but a trigger. In practice, this is usually set very high (1000) or disabled (0), since merging in-memory segments is often less expensive than merging from disk (see notes following this table). This threshold influences only the frequency of in-memory merges during the shuffle.
mapreduce.reduce.shuffle.merge.percentfloatThe memory threshold for fetched map outputs before an in-memory merge is started, expressed as a percentage of memory allocated to storing map outputs in memory. Since map outputs that can't fit in memory can be stalled, setting this high may decrease parallelism between the fetch and merge. Conversely, values as high as 1.0 have been effective for reduces whose input can fit entirely in memory. This parameter influences only the frequency of in-memory merges during the shuffle.
mapreduce.reduce.shuffle.input.buffer.percentfloatThe percentage of memory- relative to the maximum heapsize as typically specified in mapreduce.reduce.java.opts- that can be allocated to storing map outputs during the shuffle. Though some memory should be set aside for the framework, in general it is advantageous to set this high enough to store large and numerous map outputs.
mapreduce.reduce.input.buffer.percentfloatThe percentage of memory relative to the maximum heapsize in which map outputs may be retained during the reduce. When the reduce begins, map outputs will be merged to disk until those that remain are under the resource limit this defines. By default, all map outputs are merged to disk before the reduce begins to maximize the memory available to the reduce. For less memory-intensive reduces, this should be increased to avoid trips to disk.

Other notes

  • If a map output is larger than 25 percent of the memory allocated to copying map outputs, it will be written directly to disk without first staging through memory.
  • When running with a combiner, the reasoning about high merge thresholds and large buffers may not hold. For merges started before all map outputs have been fetched, the combiner is run while spilling to disk. In some cases, one can obtain better reduce times by spending resources combining map outputs- making disk spills small and parallelizing spilling and fetching- rather than aggressively increasing buffer sizes.
  • When merging in-memory map outputs to disk to begin the reduce, if an intermediate merge is necessary because there are segments to spill and at least mapreduce.task.io.sort.factor segments already on disk, the in-memory map outputs will be part of the intermediate merge.
Configured Parameters

The following properties are localized in the job configuration for each task's execution:

NameTypeDescription
mapreduce.job.idStringThe job id
mapreduce.job.jarStringjob.jar location in job directory
mapreduce.job.local.dirStringThe job specific shared scratch space
mapreduce.task.idStringThe task id
mapreduce.task.attempt.idStringThe task attempt id
mapreduce.task.is.mapbooleanIs this a map task
mapreduce.task.partitionintThe id of the task within the job
mapreduce.map.input.fileStringThe filename that the map is reading from
mapreduce.map.input.startlongThe offset of the start of the map input split
mapreduce.map.input.lengthlongThe number of bytes in the map input split
mapreduce.task.output.dirStringThe task's temporary output directory

Note: During the execution of a streaming job, the names of the "mapreduce" parameters are transformed. The dots ( . ) become underscores ( _ ). For example, mapreduce.job.id becomes mapreduce_job_id and mapreduce.job.jar becomes mapreduce_job_jar. To get the values in a streaming job's mapper/reducer use the parameter names with the underscores.

Task Logs

The standard output (stdout) and error (stderr) streams and the syslog of the task are read by the NodeManager and logged to ${HADOOP_LOG_DIR}/userlogs.

Distributing Libraries

The DistributedCache can also be used to distribute both jars and native libraries for use in the map and/or reduce tasks. The child-jvm always has its current working directory added to the java.library.path and LD_LIBRARY_PATH. And hence the cached libraries can be loaded via System.loadLibrary or System.load. More details on how to load shared libraries through distributed cache are documented at Native Libraries.

Job Submission and Monitoring

Job is the primary interface by which user-job interacts with the ResourceManager.

Job provides facilities to submit jobs, track their progress, access component-tasks' reports and logs, get the MapReduce cluster's status information and so on.

The job submission process involves:

  1. Checking the input and output specifications of the job.
  2. Computing the InputSplit values for the job.
  3. Setting up the requisite accounting information for the DistributedCache of the job, if necessary.
  4. Copying the job's jar and configuration to the MapReduce system directory on the FileSystem.
  5. Submitting the job to the ResourceManager and optionally monitoring it's status.

Job history files are also logged to user specified directory mapreduce.jobhistory.intermediate-done-dir and mapreduce.jobhistory.done-dir, which defaults to job output directory.

User can view the history logs summary in specified directory using the following command 
$ mapred job -history output.jhist 
This command will print job details, failed and killed tip details. 
More details about the job such as successful tasks and task attempts made for each task can be viewed using the following command 
$ mapred job -history all output.jhist

Normally the user uses Job to create the application, describe various facets of the job, submit the job, and monitor its progress.

Job Control

Users may need to chain MapReduce jobs to accomplish complex tasks which cannot be done via a single MapReduce job. This is fairly easy since the output of the job typically goes to distributed file-system, and the output, in turn, can be used as the input for the next job.

However, this also means that the onus on ensuring jobs are complete (success/failure) lies squarely on the clients. In such cases, the various job-control options are:

Job Input

InputFormat describes the input-specification for a MapReduce job.

The MapReduce framework relies on the InputFormat of the job to:

  1. Validate the input-specification of the job.
  2. Split-up the input file(s) into logical InputSplit instances, each of which is then assigned to an individual Mapper.
  3. Provide the RecordReader implementation used to glean input records from the logical InputSplit for processing by the Mapper.

The default behavior of file-based InputFormat implementations, typically sub-classes of FileInputFormat, is to split the input into logical InputSplit instances based on the total size, in bytes, of the input files. However, the FileSystem blocksize of the input files is treated as an upper bound for input splits. A lower bound on the split size can be set via mapreduce.input.fileinputformat.split.minsize.

Clearly, logical splits based on input-size is insufficient for many applications since record boundaries must be respected. In such cases, the application should implement a RecordReader, who is responsible for respecting record-boundaries and presents a record-oriented view of the logical InputSplit to the individual task.

TextInputFormat is the default InputFormat.

If TextInputFormat is the InputFormat for a given job, the framework detects input-files with the .gz extensions and automatically decompresses them using the appropriate CompressionCodec. However, it must be noted that compressed files with the above extensions cannot be split and each compressed file is processed in its entirety by a single mapper.

InputSplit

InputSplit represents the data to be processed by an individual Mapper.

Typically InputSplit presents a byte-oriented view of the input, and it is the responsibility of RecordReader to process and present a record-oriented view.

FileSplit is the default InputSplit. It sets mapreduce.map.input.file to the path of the input file for the logical split.

RecordReader

RecordReader reads <key, value> pairs from an InputSplit.

Typically the RecordReader converts the byte-oriented view of the input, provided by the InputSplit, and presents a record-oriented to the Mapper implementations for processing. RecordReader thus assumes the responsibility of processing record boundaries and presents the tasks with keys and values.

Job Output

OutputFormat describes the output-specification for a MapReduce job.

The MapReduce framework relies on the OutputFormat of the job to:

  1. Validate the output-specification of the job; for example, check that the output directory doesn't already exist.
  2. Provide the RecordWriter implementation used to write the output files of the job. Output files are stored in a FileSystem.

TextOutputFormat is the default OutputFormat.

OutputCommitter

OutputCommitter describes the commit of task output for a MapReduce job.

The MapReduce framework relies on the OutputCommitter of the job to:

  1. Setup the job during initialization. For example, create the temporary output directory for the job during the initialization of the job. Job setup is done by a separate task when the job is in PREP state and after initializing tasks. Once the setup task completes, the job will be moved to RUNNING state.
  2. Cleanup the job after the job completion. For example, remove the temporary output directory after the job completion. Job cleanup is done by a separate task at the end of the job. Job is declared SUCCEDED/FAILED/KILLED after the cleanup task completes.
  3. Setup the task temporary output. Task setup is done as part of the same task, during task initialization.
  4. Check whether a task needs a commit. This is to avoid the commit procedure if a task does not need commit.
  5. Commit of the task output. Once task is done, the task will commit it's output if required.
  6. Discard the task commit. If the task has been failed/killed, the output will be cleaned-up. If task could not cleanup (in exception block), a separate task will be launched with same attempt-id to do the cleanup.

FileOutputCommitter is the default OutputCommitter. Job setup/cleanup tasks occupy map or reduce containers, whichever is available on the NodeManager. And JobCleanup task, TaskCleanup tasks and JobSetup task have the highest priority, and in that order.

Task Side-Effect Files

In some applications, component tasks need to create and/or write to side-files, which differ from the actual job-output files.

In such cases there could be issues with two instances of the same Mapper or Reducer running simultaneously (for example, speculative tasks) trying to open and/or write to the same file (path) on the FileSystem. Hence the application-writer will have to pick unique names per task-attempt (using the attemptid, say attempt_200709221812_0001_m_000000_0), not just per task.

To avoid these issues the MapReduce framework, when the OutputCommitter is FileOutputCommitter, maintains a special ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} sub-directory accessible via ${mapreduce.task.output.dir} for each task-attempt on the FileSystem where the output of the task-attempt is stored. On successful completion of the task-attempt, the files in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) are promoted to ${mapreduce.output.fileoutputformat.outputdir}. Of course, the framework discards the sub-directory of unsuccessful task-attempts. This process is completely transparent to the application.

The application-writer can take advantage of this feature by creating any side-files required in ${mapreduce.task.output.dir} during execution of a task via FileOutputFormat.getWorkOutputPath(Conext), and the framework will promote them similarly for succesful task-attempts, thus eliminating the need to pick unique paths per task-attempt.

Note: The value of ${mapreduce.task.output.dir} during execution of a particular task-attempt is actually ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid}, and this value is set by the MapReduce framework. So, just create any side-files in the path returned by FileOutputFormat.getWorkOutputPath(Conext) from MapReduce task to take advantage of this feature.

The entire discussion holds true for maps of jobs with reducer=NONE (i.e. 0 reduces) since output of the map, in that case, goes directly to HDFS.

RecordWriter

RecordWriter writes the output <key, value> pairs to an output file.

RecordWriter implementations write the job outputs to the FileSystem.

Other Useful Features

Submitting Jobs to Queues

Users submit jobs to Queues. Queues, as collection of jobs, allow the system to provide specific functionality. For example, queues use ACLs to control which users who can submit jobs to them. Queues are expected to be primarily used by Hadoop Schedulers.

Hadoop comes configured with a single mandatory queue, called 'default'. Queue names are defined in the mapreduce.job.queuename> property of the Hadoop site configuration. Some job schedulers, such as the Capacity Scheduler, support multiple queues.

A job defines the queue it needs to be submitted to through the mapreduce.job.queuename property, or through the Configuration.set(MRJobConfig.QUEUE_NAME, String) API. Setting the queue name is optional. If a job is submitted without an associated queue name, it is submitted to the 'default' queue.

Counters

Counters represent global counters, defined either by the MapReduce framework or applications. Each Counter can be of any Enum type. Counters of a particular Enum are bunched into groups of type Counters.Group.

Applications can define arbitrary Counters (of type Enum) and update them via Counters.incrCounter(Enum, long) or Counters.incrCounter(String, String, long) in the map and/or reduce methods. These counters are then globally aggregated by the framework.

DistributedCache

DistributedCache distributes application-specific, large, read-only files efficiently.

DistributedCache is a facility provided by the MapReduce framework to cache files (text, archives, jars and so on) needed by applications.

Applications specify the files to be cached via urls (hdfs://) in the Job. The DistributedCache assumes that the files specified via hdfs:// urls are already present on the FileSystem.

The framework will copy the necessary files to the slave node before any tasks for the job are executed on that node. Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are un-archived on the slaves.

DistributedCache tracks the modification timestamps of the cached files. Clearly the cache files should not be modified by the application or externally while the job is executing.

DistributedCache can be used to distribute simple, read-only data/text files and more complex types such as archives and jars. Archives (zip, tar, tgz and tar.gz files) are un-archived at the slave nodes. Files have execution permissions set.

The files/archives can be distributed by setting the property mapreduce.job.cache.{files|archives}. If more than one file/archive has to be distributed, they can be added as comma separated paths. The properties can also be set by APIs Job.addCacheFile(URI)Job.addCacheArchive(URI) and Job.setCacheFiles(URI[])Job.setCacheArchives(URI[]) where URI is of the form hdfs://host:port/absolute-path#link-name. In Streaming, the files can be distributed through command line option -cacheFile/-cacheArchive.

The DistributedCache can also be used as a rudimentary software distribution mechanism for use in the map and/or reduce tasks. It can be used to distribute both jars and native libraries. The Job.addArchiveToClassPath(Path) or Job.addFileToClassPath(Path) api can be used to cache files/jars and also add them to the classpath of child-jvm. The same can be done by setting the configuration properties mapreduce.job.classpath.{files|archives}. Similarly the cached files that are symlinked into the working directory of the task can be used to distribute native libraries and load them.

Private and Public DistributedCache Files

DistributedCache files can be private or public, that determines how they can be shared on the slave nodes.

  • "Private" DistributedCache files are cached in a localdirectory private to the user whose jobs need these files. These files are shared by all tasks and jobs of the specific user only and cannot be accessed by jobs of other users on the slaves. A DistributedCache file becomes private by virtue of its permissions on the file system where the files are uploaded, typically HDFS. If the file has no world readable access, or if the directory path leading to the file has no world executable access for lookup, then the file becomes private.
  • "Public" DistributedCache files are cached in a global directory and the file access is setup such that they are publicly visible to all users. These files can be shared by tasks and jobs of all users on the slaves. A DistributedCache file becomes public by virtue of its permissions on the file system where the files are uploaded, typically HDFS. If the file has world readable access, AND if the directory path leading to the file has world executable access for lookup, then the file becomes public. In other words, if the user intends to make a file publicly available to all users, the file permissions must be set to be world readable, and the directory permissions on the path leading to the file must be world executable.
Profiling

Profiling is a utility to get a representative (2 or 3) sample of built-in java profiler for a sample of maps and reduces.

User can specify whether the system should collect profiler information for some of the tasks in the job by setting the configuration property mapreduce.task.profile. The value can be set using the api Configuration.set(MRJobConfig.TASK_PROFILE, boolean). If the value is set true, the task profiling is enabled. The profiler information is stored in the user log directory. By default, profiling is not enabled for the job.

Once user configures that profiling is needed, she/he can use the configuration property mapreduce.task.profile.{maps|reduces} to set the ranges of MapReduce tasks to profile. The value can be set using the api Configuration.set(MRJobConfig.NUM_{MAP|REDUCE}_PROFILES, String). By default, the specified range is 0-2.

User can also specify the profiler configuration arguments by setting the configuration property mapreduce.task.profile.params. The value can be specified using the api Configuration.set(MRJobConfig.TASK_PROFILE_PARAMS, String). If the string contains a %s, it will be replaced with the name of the profiling output file when the task runs. These parameters are passed to the task child JVM on the command line. The default value for the profiling parameters is -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s.

Debugging

The MapReduce framework provides a facility to run user-provided scripts for debugging. When a MapReduce task fails, a user can run a debug script, to process task logs for example. The script is given access to the task's stdout and stderr outputs, syslog and jobconf. The output from the debug script's stdout and stderr is displayed on the console diagnostics and also as part of the job UI.

In the following sections we discuss how to submit a debug script with a job. The script file needs to be distributed and submitted to the framework.

How to distribute the script file:

The user needs to use DistributedCache to distribute and symlink the script file.

How to submit the script:

A quick way to submit the debug script is to set values for the properties mapreduce.map.debug.script and mapreduce.reduce.debug.script, for debugging map and reduce tasks respectively. These properties can also be set by using APIsConfiguration.set(MRJobConfig.MAP_DEBUG_SCRIPT, String) and Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT, String). In streaming mode, a debug script can be submitted with the command-line options -mapdebug and -reducedebug, for debugging map and reduce tasks respectively.

The arguments to the script are the task's stdout, stderr, syslog and jobconf files. The debug command, run on the node where the MapReduce task failed, is: 
$script $stdout $stderr $syslog $jobconf

Pipes programs have the c++ program name as a fifth argument for the command. Thus for the pipes programs the command is 
$script $stdout $stderr $syslog $jobconf $program

Default Behavior:

For pipes, a default script is run to process core dumps under gdb, prints stack trace and gives info about running threads.

Data Compression

Hadoop MapReduce provides facilities for the application-writer to specify compression for both intermediate map-outputs and the job-outputs i.e. output of the reduces. It also comes bundled with CompressionCodec implementation for the zlib compression algorithm. The gzipbzip2snappy, and lz4 file format are also supported.

Hadoop also provides native implementations of the above compression codecs for reasons of both performance (zlib) and non-availability of Java libraries. More details on their usage and availability are available here.

Intermediate Outputs

Applications can control compression of intermediate map-outputs via the Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS, boolean) api and the CompressionCodec to be used via the Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Class) api.

Job Outputs

Applications can control compression of job-outputs via the FileOutputFormat.setCompressOutput(Job, boolean) api and the CompressionCodec to be used can be specified via the FileOutputFormat.setOutputCompressorClass(Job, Class) api.

If the job outputs are to be stored in the SequenceFileOutputFormat, the required SequenceFile.CompressionType (i.e. RECORD / BLOCK - defaults to RECORD) can be specified via the SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) api.

Skipping Bad Records

Hadoop provides an option where a certain set of bad input records can be skipped when processing map inputs. Applications can control this feature through the SkipBadRecords class.

This feature can be used when map tasks crash deterministically on certain input. This usually happens due to bugs in the map function. Usually, the user would have to fix these bugs. This is, however, not possible sometimes. The bug may be in third party libraries, for example, for which the source code is not available. In such cases, the task never completes successfully even after multiple attempts, and the job fails. With this feature, only a small portion of data surrounding the bad records is lost, which may be acceptable for some applications (those performing statistical analysis on very large data, for example).

By default this feature is disabled. For enabling it, refer to SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) and SkipBadRecords.setReducerMaxSkipGroups(Configuration, long).

With this feature enabled, the framework gets into 'skipping mode' after a certain number of map failures. For more details, see SkipBadRecords.setAttemptsToStartSkipping(Configuration, int). In 'skipping mode', map tasks maintain the range of records being processed. To do this, the framework relies on the processed record counter. See SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS and SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS. This counter enables the framework to know how many records have been processed successfully, and hence, what record range caused a task to crash. On further attempts, this range of records is skipped.

The number of records skipped depends on how frequently the processed record counter is incremented by the application. It is recommended that this counter be incremented after every record is processed. This may not be possible in some applications that typically batch their processing. In such cases, the framework may skip additional records surrounding the bad record. Users can control the number of skipped records through SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) andSkipBadRecords.setReducerMaxSkipGroups(Configuration, long). The framework tries to narrow the range of skipped records using a binary search-like approach. The skipped range is divided into two halves and only one half gets executed. On subsequent failures, the framework figures out which half contains bad records. A task will be re-executed till the acceptable skipped value is met or all task attempts are exhausted. To increase the number of task attempts, use Job.setMaxMapAttempts(int) and Job.setMaxReduceAttempts(int)

Skipped records are written to HDFS in the sequence file format, for later analysis. The location can be changed through SkipBadRecords.setSkipOutputPath(JobConf, Path).

Example: WordCount v2.0

Here is a more complete WordCount which uses many of the features provided by the MapReduce framework we discussed so far.

This needs the HDFS to be up and running, especially for the DistributedCache-related features. Hence it only works with a pseudo-distributed or fully-distributed Hadoop installation.

Source Code
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class WordCount2 {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", true)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
Sample Runs

Sample text-files as input:

$ bin/hdfs dfs -ls /user/joe/wordcount/input/ 
/user/joe/wordcount/input/file01 
/user/joe/wordcount/input/file02 

$ bin/hdfs dfs -cat /user/joe/wordcount/input/file01 
Hello World, Bye World! 

$ bin/hdfs dfs -cat /user/joe/wordcount/input/file02 
Hello Hadoop, Goodbye to hadoop.

Run the application:

$ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output

Output:

$ bin/hdfs dfs -cat /user/joe/wordcount/output/part-r-00000 
Bye 1 
Goodbye 1 
Hadoop, 1 
Hello 2 
World! 1 
World, 1 
hadoop. 1 
to 1

Notice that the inputs differ from the first version we looked at, and how they affect the outputs.

Now, lets plug-in a pattern-file which lists the word-patterns to be ignored, via the DistributedCache.

$ bin/hdfs dfs -cat /user/joe/wordcount/patterns.txt 
\. 
\, 
\! 
to

Run it again, this time with more options:

$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

As expected, the output:

$ bin/hdfs dfs -cat /user/joe/wordcount/output/part-r-00000 
Bye 1 
Goodbye 1 
Hadoop 1 
Hello 2 
World 2 
hadoop 1

Run it once more, this time switch-off case-sensitivity:

$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

Sure enough, the output:

$ bin/hdfs dfs -cat /user/joe/wordcount/output/part-r-00000 
bye 1 
goodbye 1 
hadoop 2 
hello 2 
horld 2

Highlights

The second version of WordCount improves upon the previous one by using some features offered by the MapReduce framework:

  • Demonstrates how applications can access configuration parameters in the setup method of the Mapper (and Reducer) implementations.
  • Demonstrates how the DistributedCache can be used to distribute read-only data needed by the jobs. Here it allows the user to specify word-patterns to skip while counting.
  • Demonstrates the utility of the GenericOptionsParser to handle generic Hadoop command-line options.
  • Demonstrates how applications can use Counters and how they can set application-specific status information passed to the map (and reduce) method.

Java and JNI are trademarks or registered trademarks of Oracle America, Inc. in the United States and other countries.








MapReduce 튜토리얼

목적

이 문서는 Hadoop MapReduce 프레임 워크의 모든 사용자 지향 측면을 포괄적으로 설명하고 자습서 역할을합니다.

선결 요건

Hadoop이 설치되고 구성되었으며 실행 중인지 확인하십시오. 자세한 내용은:

개요

Hadoop MapReduce는 대용량 클러스터 (수천 노드)의 대량 하드웨어에서 방대한 양의 데이터 (멀티 테라 바이트 데이터 세트)를 안정적이고 내결함성이있는 방식으로 병렬 처리하는 애플리케이션을 쉽게 작성하기위한 소프트웨어 프레임 워크입니다.

MapReduce 작업은 일반적으로 입력 데이터 세트를 완전히 병렬 방식으로 맵 작업 에 의해 처리되는 독립적 인 청크로 분할합니다 프레임 워크는지도의 출력을 정렬 한 다음 축소 작업에 입력 합니다 . 일반적으로 작업의 입력과 출력은 모두 파일 시스템에 저장됩니다. 프레임 워크는 작업 스케줄링, 작업 모니터링 및 실패한 작업 재실행을 처리합니다.

일반적으로 계산 노드와 스토리지 노드는 동일합니다. 즉, MapReduce 프레임 워크와 Hadoop 분산 파일 시스템 ( HDFS 아키텍처 가이드 참조 )은 동일한 노드 집합에서 실행됩니다. 이 구성을 사용하면 프레임 워크에서 데이터가 이미있는 노드에서 작업을 효과적으로 스케줄링 할 수 있으므로 클러스터 전체에서 매우 높은 총 대역폭이 발생합니다.

MapReduce 프레임 워크는 단일 마스터 ResourceManager , 클러스터 노드 당 하나의 슬레이브 NodeManager 및 애플리케이션 당 MRAppMaster 로 구성됩니다 ( YARN 아키텍처 가이드 참조 ).

애플리케이션 은 적절한 인터페이스 및 / 또는 추상 클래스의 구현을 통해 입력 / 출력 위치를 지정하고 맵을 제공 하고 기능을  입니다. 이들 및 다른 작업 매개 변수는 작업 구성을 구성합니다 .

그런 다음 Hadoop 작업 클라이언트 는 ResourceManager에 작업 (jar / executable 등) 및 구성을 제출하여 소프트웨어 / 구성을 슬레이브에 배포하고 작업을 예약하고 모니터링하며 작업 및 상태 정보에 상태 및 진단 정보를 제공합니다. 고객.

Hadoop 프레임 워크가 Java ™로 구현되었지만 MapReduce 애플리케이션을 Java로 작성할 필요는 없습니다.

  • Hadoop Streaming 은 매퍼 및 / 또는 감속기로 실행 파일 (예 : 쉘 유틸리티)이있는 작업을 만들고 실행할 수있는 유틸리티입니다.
  • Hadoop Pipes 는 MapReduce 애플리케이션 (JNI 기반이 아닌)을 구현 하는 SWIG 호환 C ++ API입니다.

입력 및 출력

MapReduce 프레임 워크는 <key, value> 쌍에 대해 독점적으로 작동합니다 . 즉, 프레임 워크는 작업에 대한 입력을 <key, value> 쌍의 집합으로보고, <key, value> 쌍 의 집합을 직업, 다른 종류의 생각.

 와  클래스는 프레임 워크에 의해 직렬화 가능해야하고 따라서 구현해야 쓰기 가능한 인터페이스를. 또한 키 클래스는 프레임 워크별로 정렬을 용이하게하기 위해 WritableComparable 인터페이스를 구현해야합니다 .

MapReduce 작업의 입력 및 출력 유형 :

(입력) <K1, V1> -> 맵 -> <K2, V2> -> 결합 -> <K2, V2> -> 감소 -> <K3, V3> (출력)

예 : WordCount v1.0

세부 사항으로 들어가기 전에 예제 MapReduce 애플리케이션을 살펴보고 어떻게 작동하는지 살펴 보자.

WordCount 는 주어진 입력 세트에서 각 단어의 발생 횟수를 계산하는 간단한 응용 프로그램입니다.

이것은 로컬 독립형, 의사 분산 또는 완전 분산 Hadoop 설치 ( 단일 노드 설정 )에서 작동합니다.

소스 코드

import java.io.IOException; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

공용 클래스 WordCount { 

  public static class TokenizerMapper는 
       매퍼를 확장합니다. <Object, Text, Text, IntWritable> { 

    개인 최종 정적 IntWritable one = 새 IntWritable (1);
    개인 텍스트 단어 = 새 텍스트 (); 

    public void map (Object key, Text value, Context context 
                    ) throws IOException, InterruptedException { 
      StringTokenizer itr = new StringTokenizer (value.toString ()); 
      while (itr.hasMoreTokens ()) { 
        word.set (itr.nextToken ()); 
        context.write (word, one); 
      } 
    } 
  } 

  public static class IntSumReducer는 
       Reducer <Text, IntWritable, Text, IntWritable>을 확장합니다. { 
    private IntWritable result = new IntWritable (); 

    public void reduce (텍스트 키, Iterable <IntWritable> 값, 
                       컨텍스트 컨텍스트
                       ) IOException을 던집니다, InterruptedException { 
      int sum = 0; 
      for (IntWritable val : 값) { 
        sum + = val.get (); 
      } 
      result.set (합계); 
      context.write (key, result); 
    } 
  } 

  public static void main (String [] args) 예외를 throw합니다. { 
    Configuration conf = new Configuration (); 
    작업 작업 = Job.getInstance (conf, "word count"); 
    job.setJarByClass (WordCount.class); 
    job.setMapperClass (TokenizerMapper.class); 
    job.setCombinerClass (IntSumReducer.class); 
    job.setReducerClass (IntSumReducer.class); 
    job.setOutputKeyClass (Text.class);
    job.setOutputValueClass (IntWritable.class); 
    FileInputFormat.addInputPath (job, newPath (args [0])); 
    FileOutputFormat.setOutputPath (job, 새 경로 (args [1])); 
    System.exit (job.waitForCompletion (true)? 0 : 1); 
  } 
}

용법

환경 변수가 다음과 같이 설정된다고 가정하십시오.

export JAVA_HOME = / usr / java / default 
export PATH = $ JAVA_HOME / bin : $ PATH 
export HADOOP_CLASSPATH = $ JAVA_HOME / lib / tools.jar

WordCount.java를 컴파일 하고 jar 파일 을 만듭니다.

$ bin / hadoop com.sun.tools.javac.Main WordCount.java 
$ jar cf wc.jar WordCount * .class

가정 할 때 :

  • / user / joe / wordcount / input - HDFS의 입력 디렉토리
  • / user / joe / wordcount / output - HDFS의 출력 디렉토리

샘플 텍스트 파일을 입력으로 사용 :

$ bin / hdfs dfs -ls / user / joe / wordcount / input / 
/ user / joe / wordcount / input / file01 
/ user / joe / wordcount / input / file02

$ bin / hdfs dfs -cat / user / joe / wordcount / input / file01 
Hello World 안녕 월드

$ bin / hdfs dfs -cat / user / joe / wordcount / input / file02 
Hello Hadoop 안녕히 가세요

응용 프로그램 실행 :

$ bin / hadoop jar wc.jar WordCount / user / joe / wordcount / input / user / joe / wordcount / output

산출:

$ bin / hdfs dfs -cat / user / joe / wordcount / output / part-r-00000

안녕히 가세요 1 
안녕히 계세요 1 
하둡 2 
안녕 2 
세계 2

응용 프로그램은 -files 옵션을 사용하여 작업의 현재 작업 디렉토리에 존재할 쉼표로 구분 된 경로 목록을 지정할 수 있습니다 -libjars 옵션은 응용 프로그램이 맵의 클래스 경로 및 감소에 항아리를 추가 할 수 있습니다. -archives 옵션을 사용하면 쉼표로 구분 된 아카이브 목록을 인수로 전달할 수 있습니다. 이러한 아카이브는 아카이브되지 않고 아카이브의 이름이있는 링크가 태스크의 현재 작업 디렉토리에 작성됩니다. 명령 행 옵션에 대한 자세한 내용은 명령 안내서를 참조하십시오 .

-libjars , -files 및 -archives로 wordcount 예제 실행 bin / hadoop jar hadoop-mapreduce-examples- <ver> .jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip 입력 결과 여기서 myarchive입니다. zip은 "myarchive.zip"이라는 이름으로 디렉토리에 저장되고 압축 해제됩니다.

#을 사용하여 -files 및 -archives 옵션을 통해 전달 된 파일 및 아카이브에 대해 다른 기호 이름을 지정할 수 있습니다 .

예를 들어, bin / hadoop jar hadoop-mapreduce-examples- <ver> .jar wordcount -files dir1 / dict.txt # dict1, dir2 / dict.txt # dict2 -archives mytar.tgz # tgzdir 입력 결과 여기서 dir1 /dict.txt 및 dir2 / dict.txt는 각각 dict1 및 dict2라는 기호 이름을 사용하여 태스크에 의해 액세스 될 수 있습니다. 아카이브 mytar.tgz는 "tgzdir"이름으로 디렉토리에 저장되고 아카이브 해제됩니다.

워크 쓰루

단어 수의 응용 프로그램은 매우 직선적이다.

    public void map (Object key, Text value, Context context 
                    ) throws IOException, InterruptedException { 
      StringTokenizer itr = new StringTokenizer (value.toString ()); 
      while (itr.hasMoreTokens ()) { 
        word.set (itr.nextToken ()); 
        context.write (word, one); 
      } 
    }

매퍼 구현은 비아 지도 지정된 의해 제공되는 방법에있어서, 한 번에 하나 개의 행을 처리 TextInputFormat . 그런 다음 줄을 StringTokenizer 를 통해 공백으로 구분 된 토큰으로 분할하고 <<word>, 1> 의 키 - 값 쌍을 내 보냅니다 .

주어진 샘플 입력에 대해 첫 번째 맵은 다음을 방출합니다. 
<Hello, 1> 
<World, 1> 
<Bye, 1> 
<World, 1>

두 번째 맵은 
<Hello, 1> 
<Hadoop, 1> 
<Goodbye, 1> 
<Hadoop, 1>

주어진 작업에 대해 생성 된 맵의 수와 세부적인 방법으로 맵을 제어하는 ​​방법에 대해 더 배우게됩니다. 나중에이 튜토리얼에서 설명합니다.

    job.setCombinerClass (IntSumReducer.class);

WordCount 는 결합 자도 지정합니다 따라서 각지도의 출력은 로컬 결합 자 ( 작업 구성 과 마찬가지로 감속기 와 동일 함 )를 통해  집계 후 로컬 집계에 전달 됩니다.

첫 번째지도의 출력 : 
<Bye, 1> 
<Hello, 1> 
<World, 2>

두 번째 맵 출력 : 
<Goodbye, 1> 
<Hadoop, 2> 
<Hello, 1>

    public void reduce (텍스트 키, Iterable <IntWritable> 값, 
                       컨텍스트 컨텍스트 
                       ) throw IOException, InterruptedException { 
      int sum = 0; 
      for (IntWritable val : 값) { 
        sum + = val.get (); 
      } 
      result.set (합계); 
      context.write (key, result); 
    }

감속기 비아 구현 줄일 방법은 각 키의 선두로부터 카운트 (이 예에서는, 즉 단어) 인 값을 요약한다.


<Bye, 1> 
<Goodbye, 1> 
<Hadoop, 2> 
<Hello, 2> <World, 2> 작업 출력은 다음과 같습니다.

주요 방법은 등의 여러 예 (명령 라인을 통해 전달), 입력 / 출력 경로 등의 작업면, 키 / 값 유형의 입 / 출력 형식을 지정하는 작업 . 그런 다음 job.waitForCompletion 을 호출하여 작업을 제출하고 진행 상황을 모니터링합니다.

자습서에서 Job , InputFormat , OutputFormat 및 기타 인터페이스와 클래스 에 대해 더 자세히 배웁니다 .

MapReduce - 사용자 인터페이스

이 섹션에서는 MapReduce 프레임 워크의 모든 사용자 지향 측면에 대해 적절한 양의 세부 정보를 제공합니다. 이렇게하면 사용자가 세부적인 방법으로 작업을 구현, 구성 및 조정하는 데 도움이됩니다. 그러나 각 클래스 / 인터페이스에 대한 javadoc은 가장 포괄적 인 문서로 남아 있습니다. 이것은 자습서 일뿐입니다.

먼저 매퍼 및 감속기 인터페이스를 살펴 보겠습니다 응용 프로그램은 일반적으로 제공하도록 구현 지도 및 감소 방법.

그런 다음 Job , Partitioner , InputFormat , OutputFormat 등의 다른 핵심 인터페이스에 대해 논의 할 것 입니다.

마지막으로, 우리는 DistributedCache , IsolationRunner 등과 같은 프레임 워크의 유용한 기능에 대해 논의 할 것입니다 .

유효 탑재량

응용 프로그램은 일반적으로 구현 매퍼 및 감속기 제공하기 위해 인터페이스를 지도 및 감소 방법. 이들은 직업의 핵심입니다.

매퍼

Mapper 는 입력 키 / 값 쌍을 중간 키 / 값 쌍의 집합으로 매핑합니다.

맵은 입력 레코드를 중간 레코드로 변환하는 개별 태스크입니다. 변환 된 중간 레코드는 입력 레코드와 동일한 유형일 필요는 없습니다. 주어진 입력 쌍은 0 또는 여러 출력 쌍에 매핑 될 수 있습니다.

Hadoop MapReduce 프레임 워크 는 작업 의 InputFormat 에 의해 생성 된 각 InputSplit 에 대해 하나의 맵 작업을 생성합니다 .

전반적으로, Mapper 구현은 Job.setMapperClass (Class) 메소드 를 통해 작업에 대한 작업 에 전달 됩니다. 그런 다음 프레임 워크 는 해당 작업 에 대한 InputSplit의 각 키 / 값 쌍에 대해 map (WritableComparable, Writable, Context) 을 호출합니다 그런 다음 응용 프로그램은 정리 (Context) 메소드를 대체하여 필수 정리를 수행 할 수 있습니다.

출력 쌍은 입력 쌍과 동일한 유형 일 필요는 없습니다. 주어진 입력 쌍은 0 또는 여러 출력 쌍에 매핑 될 수 있습니다. 출력 쌍은 context.write (WritableComparable, Writable)를 호출하여 수집됩니다.

응용 프로그램은 카운터 를 사용하여 통계를보고 할 수 있습니다 .

주어진 출력 키와 관련된 모든 중간 값은 프레임 워크에 의해 그룹화되고 감속기로 전달되어 최종 출력을 결정합니다. 사용자는 Job.setGroupingComparatorClass (Class) 를 통해 Comparator 를 지정하여 그룹화를 제어 할 수 있습니다 .

매퍼 출력은 정렬 후마다 분할된다 감속기 . 파티션의 총 수는 작업의 축소 작업 수와 같습니다. 사용자는 사용자 정의 Partitioner 를 구현하여 Reducer 로 이동하는 키 (따라서 레코드)를 제어 할 수 있습니다 .

사용자는 Job.setCombinerClass (Class) 를 통해 결합 자를 선택적으로 지정 하여 중간 출력의 로컬 집계를 수행 할 수 있으므로 Mapper 에서 Reducer 로 전송되는 데이터의 양을 줄일 수 있습니다.

중간의 정렬 된 출력은 항상 간단한 (키 - len, 키, 값 - 값, 값) 형식으로 저장됩니다. 응용 프로그램은 중간 출력을 압축할지 여부와 방법을 제어 하고 구성을 통해 사용할 CompressionCodec 을 제어 할 수 있습니다 .

얼마나 많은지도가 있습니까?

맵의 수는 일반적으로 입력의 전체 크기, 즉 입력 파일의 총 블록 수에 의해 결정됩니다.

지도에 대한 적절한 병렬 처리 수준은 노드 당 약 10-100 개의지도 인 것으로 보이지만 매우 가벼운지도 작업을 위해 300 개의지도가 설정되었습니다. 작업 설정에 시간이 걸리므로 맵을 실행하는 데 최소 1 분이 걸리는 것이 가장 좋습니다.

따라서 입력 데이터가 10TB이고 블록 크기가 128MB 인 경우 Configuration.set ( MRJobConfig.NUM_MAPS , int) (프레임 워크에 대한 힌트 만 제공)을 사용하여 설정 하지 않는 한 82,000 개의지도 가 생성됩니다 심지어 더 높습니다.

감속기

감속기 는 작은 값 집합에 대한 키를 공유하는 중간 값 집합을 줄입니다.

작업 감소 횟수는 Job.setNumReduceTasks (int) 를 통해 사용자가 설정합니다 .

전반적으로 감속기 구현은 Job.setReducerClass (Class) 메소드 를 통해 작업에 대한 작업 에 전달되며 이를 초기화하여 자체 초기화 할 수 있습니다. 그런 다음 프레임 워크 는 그룹화 된 입력의 각 <키, (값 목록)> 쌍에 대해 reduce (WritableComparable, Iterable <Writable>, Context) 메서드를 호출합니다 그런 다음 응용 프로그램은 정리 (Context) 메소드를 대체하여 필수 정리를 수행 할 수 있습니다.

감속기 에는 3 가지 주요 단계가 있습니다 : 섞기, 분류 및 줄입니다.

혼합

Reducer 로의 입력 은 매퍼의 정렬 된 출력입니다. 이 단계에서 프레임 워크는 HTTP를 통해 모든 맵퍼 출력의 관련 파티션을 가져옵니다.

종류

이 단계에서 프레임 워크는 감속기 입력을 키 별로 그룹화 합니다 (다른 매퍼가 동일한 키를 출력 할 수 있기 때문에).

셔플 및 정렬 단계가 동시에 발생합니다. 맵 출력은 페치되고 병합됩니다.

보조 정렬

축소 전에 중간 키를 그룹화하는 등가 규칙이 그룹화 키와 다른 경우, Job.setSortComparatorClass (Class) 를 사용해 Comparator 를 지정할 수 있습니다 이후 Job.setGroupingComparatorClass (종류)가 그룹화되는 방법, 중간 키를 제어하기 위해 사용될 수 있으며, 이들은 시뮬레이트 함께 사용할 수있는 값을 보조 정렬 방법 .

줄이다

이 단계에서 reduce (WritableComparable, Iterable <Writable>, Context) 메소드가 그룹화 된 입력의 각 <key, (list of values)> 쌍에 대해 호출 됩니다.

reduce 태스크의 출력은 일반적으로 Context.write (WritableComparable, Writable)를 통해 FileSystem에 기록됩니다 .

응용 프로그램은 카운터 를 사용하여 통계를보고 할 수 있습니다 .

의 출력 감속기가 되어 정렬되지 않습니다 .

얼마나 많은 것이 줄어든가?

올바른 감소 수는 0.95 또는 1.75 배 (< 노드 수 > * < 노드 당 최대 컨테이너 수 >)로 표시됩니다.

0.95를 사용하면 모든 감소가 즉시 시작되어지도가 완료되면지도 출력을 전송할 수 있습니다. 로 1.75 빠른 노드는 감소의 첫 라운드를 완료하고로드 밸런싱의 더 나은 일을 줄일 수의 두 번째 물결을 시작합니다.

감소 횟수를 늘리면 프레임 워크 오버 헤드는 증가하지만로드 균형 조정이 증가하고 실패 비용이 낮아집니다.

위의 스케일링 계수는 투기 - 작업 및 실패한 작업을 위해 프레임 워크에서 슬롯을 몇 개 줄이기 위해 전체 숫자보다 약간 적습니다.

감속기 없음

감축을 원할 경우 감축 작업 수를 0 으로 설정하는 것이 합법적 입니다.

이 경우 map-tasks의 출력은 직접 FileSystem으로 , FileOutputFormat.setOutputPath (Job, Path) 로 설정된 출력 경로로 이동합니다 프레임 워크는 맵 출력을 FileSystem에 쓰기 전에 정렬하지 않습니다 .

분할 자

파티션 작성자는 키 공간을 분할합니다.

Partitioner는 중간 맵 출력의 키 분할을 제어합니다. 키 (또는 키의 하위 집합)는 일반적으로 해시 함수 를 사용하여 파티션을 파생시키는 데 사용됩니다 파티션의 총 수는 작업의 축소 작업 수와 같습니다. 그러므로 이것은 m 중 어떤 것이 중간 키 (따라서 레코드)를 줄이기 위해 보내지 는지를 제어합니다 .

HashPartitioner 가 기본 Partitioner 입니다.

계수기

카운터 는 MapReduce 응용 프로그램이 통계를보고 할 수있는 기능입니다.

Mapper 및 Reducer 구현은 카운터 를 사용하여 통계를보고 할 수 있습니다 .

Hadoop MapReduce에는 일반적으로 유용한 매퍼, 축소 기 및 분할기 의 라이브러리 가 번들로 제공됩니다 .

작업 구성

Job 은 MapReduce 작업 구성을 나타냅니다.

Job 은 실행을 위해 MapReduce 작업을 Hadoop 프레임 워크에 설명하는 기본 인터페이스입니다. 프레임 워크는 Job 에서 설명한대로 작업을 충실하게 실행하려고 시도 하지만 다음 과 같이합니다 .

작업 은 일반적으로 Mapper , 결합 자 (있는 경우), Partitioner , Reducer , InputFormat , OutputFormat 구현 을 지정하는 데 사용됩니다 FileInputFormat 은 입력 파일 ( FileInputFormat.setInputPaths (Job, Path ...) / FileInputFormat.addInputPath (Job, Path) ) 및 ( FileInputFormat.setInputPaths (Job, String ...) / FileInputFormat.addInputPaths )) 및 출력 파일을 작성해야하는 곳 ( FileOutputFormat.setOutputPath (Path) ).

선택 사항으로 Job 은 Comparator 사용 여부 DistributedCache에 넣을 파일 , 중간 및 / 또는 작업 출력을 압축할지 여부 (방법), 작업 작업을 수행 할 수 있는지 여부 등과 같은 작업의 다른 고급 패싯을 지정 하는 데 사용됩니다 A의 실행 추론 방식 ( setMapSpeculativeExecution (부울) ) / setReduceSpeculativeExecution (부울) ), 작업 당 시도의 최대 수 ( setMaxMapAttempts (INT) / setMaxReduceAttempts (INT) ) 등

물론 사용자는 Configuration.set (String, String) / Configuration.get (String) 을 사용하여 응용 프로그램에 필요한 임의의 매개 변수를 설정 / 가져올 수 있습니다. 그러나 많은 양의 (읽기 전용) 데이터 에는 DistributedCache 를 사용하십시오 .

작업 실행 및 환경

MRAppMaster는 실행 매퍼 / 감속기 작업 별도의 JVM에서 하위 프로세스로합니다.

하위 작업은 상위 MRAppMaster 환경을 상속받습니다 사용자는 -djava 를 통해 공유 라이브러리를 검색하기 위해 런타임 링커가 비표준 경로와 같이 job의 mapreduce. {map | reduce} .java.opts 및 구성 매개 변수를 통해 child-jvm에 대한 추가 옵션을 지정할 수 있습니다 . library.path = <> 등 . mapreduce. {map | reduce} .java.opts 매개 변수 에 MapReduce 작업 의 taskid 값으로 보간 된 @ taskid @ 기호가 포함 된 경우 .

다음은 다중 인수 및 대체가있는 예입니다. jvm GC 로깅 및 암호가없는 JVM JMX 에이전트의 시작을 보여줌으로써 jconsole과 연결하여 자식 메모리, 스레드 및 스레드 덤프를 보는 것을 볼 수 있습니다. 또한 맵의 최대 힙 크기를 설정하고 하위 jvm을 각각 512MB 및 1024MB로 줄입니다. 또한 child-jvm 의 java.library.path 에 대한 추가 경로를 추가합니다 .

<property> 
  <name> mapreduce.map.java.opts </ name> 
  <value> 
    -Xmx512M -Djava.library.path = / home / mycompany / lib -verbose : gc -Xloggc : /tmp/@taskid@.gc 
    -dcom.sun.management.jmxremote.authenticate = false -Dcom.sun.management.jmxremote.ssl = false 
  </ 
property> 

<property> 
  <name> mapreduce.reduce.java.opts </ name> 
  < / 값> 
    -Xmx1024M -Djava.library.path = / home / mycompany / lib -verbose : gc -Xloggc : /tmp/@taskid@.gc 
    -Dcom.sun.management.jmxremote.authenticate = false -Dcom.sun.management .jmxremote.ssl = false 
  </ value> 
</ property>
메모리 관리

사용자 / 관리자는 실행 된 하위 작업의 최대 가상 메모리와 mapreduce . {map | reduce} .memory.mb를 사용하여 재귀 적으로 시작하는 하위 프로세스를 지정할 수도 있습니다. 여기에서 설정 한 값은 프로세스 별 제한입니다. mapreduce. {map | reduce} .memory.mb 값 은 메가 바이트 (MB)로 지정해야합니다. 또한 값은 JavaVM에 전달 된 -Xmx보다 크거나 같아야합니다. 그렇지 않으면 VM이 시작되지 않을 수 있습니다.

참고 : mapreduce. {map | reduce} .java.opts 는 MRAppMaster 에서 시작된 하위 작업을 구성하는 데에만 사용됩니다. 데몬에 대한 메모리 옵션 구성은 Hadoop 데몬의 환경 구성에 설명되어 있습니다.

프레임 워크의 일부에서 사용할 수있는 메모리도 구성 할 수 있습니다. 맵핑 및 축소 태스크에서 성능은 조작의 동시성에 영향을주는 매개 변수와 데이터가 디스크에 도달하는 빈도를 조정함으로써 영향을받을 수 있습니다. 작업에 대한 파일 시스템 카운터를 모니터링합니다. 특히 맵에서 바이트 수를 기준으로 감소 시키면 이러한 매개 변수를 조정하는 데 매우 중요합니다.

지도 매개 변수

맵에서 방출 된 레코드는 버퍼로 직렬화되고 메타 데이터는 회계 버퍼에 저장됩니다. 다음 옵션에서 설명하는 것처럼 직렬화 버퍼 또는 메타 데이터가 임계 값을 초과하면 맵의 출력이 계속되는 동안 버퍼의 내용이 백그라운드에서 디스크로 정렬되고 디스크에 기록됩니다. 유출이 진행되는 동안 버퍼가 완전히 채워지면 맵 스레드가 차단됩니다. 맵이 끝나면 나머지 레코드가 디스크에 기록되고 모든 디스크상의 세그먼트가 단일 파일로 병합됩니다. 디스크 유출 횟수를 최소화하면 맵 시간이 줄어들 수 있지만 버퍼가 클수록 맵퍼에서 사용할 수있는 메모리도 줄어 듭니다.

이름유형기술
mapreduce.task.io.sort.mbint맵에서 방출 된 레코드를 저장하는 순차 나열 및 계정 버퍼의 누적 크기 (MB)입니다.
mapreduce.map.sort.spill.percent흙손직렬화 버퍼의 소프트 한도. 일단 도달하면 스레드는 백그라운드에서 내용을 디스크에 흘리기 시작합니다.

기타주의 사항

  • 누출이 진행되는 동안 누적 임계 값을 초과하면 유출이 완료 될 때까지 수집이 계속됩니다. 예를 들어 mapreduce.map.sort.spill.percent 가 0.33으로 설정되고 유출이 실행되는 동안 버퍼의 나머지가 채워지면 다음 유출에는 수집 된 모든 레코드 또는 버퍼의 0.66이 포함되며 추가 유출 물을 생성한다. 즉, 임계 값은 차단을 정의하는 것이 아니라 트리거를 정의하는 것입니다.
  • 순차 화 버퍼보다 ​​큰 레코드는 먼저 유출을 트리거 한 다음 별도의 파일로 유출됩니다. 이 레코드가 결합자를 처음 통과할지 여부는 정의되지 않습니다.
셔플 / 축소 매개 변수

앞에서 설명한 것처럼 각 reduce는 HTTP를 통해 Partitioner가 할당 한 출력을 메모리로 가져 와서 주기적으로 디스크에 병합합니다. 맵 출력의 중간 압축이 켜지면 각 출력이 메모리로 압축 해제됩니다. 다음 옵션은 축소 이전에 디스크에 병합되는 빈도와 축소 중에 출력을 매핑하기 위해 할당 된 메모리에 영향을줍니다.

이름유형기술
mapreduce.task.io.soft.factorint동시에 병합 할 디스크의 세그먼트 수를 지정합니다. 병합 중에 열려있는 파일 및 압축 코덱의 수를 제한합니다. 파일 수가이 제한을 초과하면 병합은 여러 단계에서 진행됩니다. 이 한도가지도에도 적용되지만 대부분의 작업은이 한도에 도달하지 않을 수 있도록 구성되어야합니다.
mapreduce.reduce.merge.inmem.thresholdsint정렬 된 맵 출력의 수는 디스크에 병합되기 전에 메모리로 반입됩니다. 이전 노트의 스필 스 임계 값과 마찬가지로, 이것은 파티션 단위를 정의하는 것이 아니라 트리거입니다. 실제로는 메모리 세그먼트 병합이 종종 디스크에서 병합하는 것보다 비용이 적기 때문에 일반적으로이 값은 매우 높게 (1000) 또는 비활성화 (0)로 설정됩니다 (이 표 다음의 참고 참조). 이 임계 값은 셔플 중 메모리 내 병합의 빈도에만 영향을 미칩니다.
mapreduce.reduce.shuffle.merge.percent흙손메모리 내 병합이 시작되기 전에 가져온 맵 출력에 대한 메모리 임계 값으로, 메모리에 맵 출력을 저장하는 데 할당 된 메모리의 백분율로 표시됩니다. 메모리에 맞지 않는 맵 출력이 지연 될 수 있으므로이 값을 높게 설정하면 페치와 병합 간의 병렬 처리가 줄어들 수 있습니다. 반대로 입력 값이 메모리에 완전히 들어갈 수있는 값을 줄이려면 1.0의 값이 효과적입니다. 이 매개 변수는 셔플 중 메모리 내 병합 빈도에만 영향을줍니다.
mapreduce.reduce.shuffle.input.buffer.percent흙손shuffle 중에지도 출력을 저장하는 데 할당 할 수있는 mapreduce.reduce.java.opts에 일반적으로 지정된 최대 힙 크기에 상대적으로 차지하는 메모리 비율입니다 프레임 워크를 위해 일부 메모리를 별도로 설정해야하지만 일반적으로 크고 많은 맵 출력을 저장할 수있을 정도로 높게 설정하는 것이 좋습니다.
mapreduce.reduce.input.buffer.percent흙손지도 출력이 축소 중에 유지 될 수있는 최대 힙 크기에 상대적인 메모리 비율입니다. reduce가 시작되면 맵 출력은 남아있는 맵 출력이 이것이 정의한 리소스 제한을 초과 할 때까지 디스크에 병합됩니다. 기본적으로 모든 맵 출력은 디스크에 병합되어 축소가 사용 가능한 메모리를 최대화하기 시작합니다. 메모리를 많이 사용하지 않는 감소의 경우 디스크로 이동하지 않으려면이 값을 늘려야합니다.

기타주의 사항

  • 맵 출력이 맵 출력 복사에 할당 된 메모리의 25 퍼센트보다 큰 경우, 처음으로 메모리를 스테이징하지 않고 디스크에 직접 기록됩니다.
  • 결합 자로 실행할 때 높은 병합 임계 값 및 대형 버퍼에 대한 추론이 유효하지 않을 수 있습니다. 모든지도 출력을 가져 오기 전에 시작된 병합의 경우 결합기는 디스크로 흘러가면서 실행됩니다. 경우에 따라 맵 출력을 결합하여 디스크 유출을 작게하고 적극적으로 버퍼 크기를 늘리지 않고 유출 및 반입을 병렬 처리하는 자원을 소비함으로써 시간을 단축 할 수 있습니다.
  • 감소를 시작하기 위해 메모리 내 맵 출력을 디스크에 병합 할 때, 디스크에 이미 세그먼트가 있고 적어도 mapreduce.task.io.sort.factor 세그먼트 가 있기 때문에 중간 병합이 필요한 경우 인 메모리 맵 출력은 중간 병합의 일부가 될 수 있습니다.
구성된 매개 변수

다음 속성은 각 작업의 실행에 대한 작업 구성에 현지화되어 있습니다.

이름유형기술
mapreduce.job.id직업 ID
mapreduce.job.jar작업 디렉토리의 job.jar 위치
mapreduce.job.local.dir작업 특정 공유 스크래치 공간
mapreduce.task.id작업 ID
mapreduce.task.attempt.id작업 시도 ID
mapreduce.task.is.map부울이것은지도 작업인가요?
mapreduce.task.partitionint작업 내 작업의 ID입니다.
mapreduce.map.input.file지도가 읽는 파일 이름
mapreduce.map.input.start맵 입력 시작 부분의 오프셋
mapreduce.map.input.length맵 입력 분할의 바이트 수
mapreduce.task.output.dir태스크의 임시 출력 디렉토리

참고 : 스트리밍 작업을 실행하는 동안 "mapreduce"매개 변수의 이름이 변환됩니다. 점 (.)은 밑줄 (_)로 표시됩니다. 예를 들어, mapreduce.job.id는 mapreduce_job_id가되고 mapreduce.job.jar는 mapreduce_job_jar가됩니다. 스트리밍 작업의 매퍼 / 감속기에서 값을 얻으려면 매개 변수 이름을 밑줄과 함께 사용하십시오.

작업 로그

표준 출력 (stdout) 및 오류 (stderr) 스트림과 작업의 syslog는 NodeManager에서 읽고 $ {HADOOP_LOG_DIR} / userlogs에 기록됩니다 .

라이브러리 배포

DistributedCache는 또한지도에 사용하기 위해 모두 항아리와 네이티브 라이브러리를 배포 및 / 또는 작업을 줄일 수 있습니다. child-jvm은 항상 현재 작업 디렉토리 가 java.library.path 및 LD_LIBRARY_PATH에 추가됩니다 따라서 캐시 된 라이브러리는 System.loadLibrary 또는 System.load를 통해로드 할 수 있습니다 분산 캐시를 통해 공유 라이브러리를로드하는 방법에 대한 자세한 내용은 네이티브 라이브러리에 설명되어 있습니다.

작업 제출 및 모니터링

작업 은 User-job이 ResourceManager 와 상호 작용하는 기본 인터페이스 입니다.

Job 은 작업 제출, 진행 상황 추적, 구성 요소 태스크 보고서 및 로그 액세스, MapReduce 클러스터 상태 정보 가져 오기 등의 기능을 제공합니다.

작업 제출 프로세스에는 다음이 포함됩니다.

  1. 작업의 입력 및 출력 사양 확인.
  2. 작업에 대한 InputSplit 값을 계산합니다 .
  3. 필요한 경우 작업 의 DistributedCache 에 대한 필수 계정 정보 설정
  4. 작업의 jar 및 구성을 FileSystem 의 MapReduce 시스템 디렉토리에 복사합니다 .
  5. ResourceManager에 작업을 제출하고 필요에 따라 상태를 모니터링합니다.

작업 기록 파일은 사용자 지정 디렉토리 인 mapreduce.jobhistory.intermediate-done-dir 및 mapreduce.jobhistory.done-dir 에도 기 록되며, 작업 출력 디렉토리가 기본값입니다.

사용자는 다음 명령을 사용하여 지정된 디렉토리에서 히스토리 로그 요약을 볼 수 있습니다. 
$ mapred job -history output.jhist 
이 명령은 작업 세부 정보, 실패 및 종료 된 팁 정보를 인쇄합니다. 
성공적인 작업 및 각 작업에 대한 작업 시도와 같은 작업에 대한 자세한 내용은 다음 명령을 사용하여 볼 수 있습니다. 
$ mapred job -history all output.jhist

일반적으로 사용자는 Job 을 사용하여 응용 프로그램을 만들고, 작업의 다양한 측면을 설명하고, 작업을 제출하고 진행 상황을 모니터링합니다.

작업 제어

사용자는 단일 MapReduce 작업을 통해 수행 할 수없는 복잡한 작업을 수행하기 위해 MapReduce 작업을 체인화해야 할 수 있습니다. 작업 출력은 일반적으로 분산 파일 시스템으로 가고 출력은 차례대로 다음 작업의 입력으로 사용할 수 있기 때문에 이것은 매우 쉽습니다.

그러나 이는 또한 일자리 확보에 대한 부담 (성공 / 실패)이 고객에게 정당하다는 것을 의미합니다. 이 경우 다양한 직업 조정 옵션은 다음과 같습니다.

작업 입력

InputFormat 은 MapReduce 작업에 대한 입력 사양을 설명합니다.

MapReduce 프레임 워크 는 작업 의 InputFormat 을 사용하여 다음을 수행합니다.

  1. 작업의 입력 사양을 검증합니다.
  2. 입력 파일을 논리적 인 InputSplit 인스턴스 로 분할 하고 각 인스턴스는 개별 Mapper에 할당됩니다 .
  3. Mapper의 처리를 위해 논리적 InputSplit 에서 입력 레코드를 수집하는 데 사용되는 RecordReader 구현을 제공합니다 .

파일 기반 InputFormat 구현 의 기본 동작 ( 일반적으로 FileInputFormat의 하위 클래스)은 입력 파일의 총 크기 (바이트)를 기준으로 입력을 논리적 InputSplit 인스턴스 로 분할하는 것 입니다. 그러나 입력 파일 의 FileSystem 블록 크기는 입력 스플릿의 상한으로 처리됩니다. 분할 크기의 하한은 mapreduce.input.fileinputformat.split.minsize 를 통해 설정할 수 있습니다 .

분명히, 레코드 경계가 존중되어야하기 때문에 많은 응용 프로그램에서는 입력 크기를 기반으로하는 논리적 분할이 충분하지 않습니다. 이러한 경우, 응용 프로그램이 구현해야 RecordReader 기록 경계를 존중에 대한 책임과 논리의 기록 지향보기 제시 InputSplit 개별 작업에 있습니다.

TextInputFormat 는 기본 InputFormat 입니다.

경우 TextInputFormat이 는 IS InputFormat 주어진 작업, 프레임 워크는과 입력 파일을 감지 는 .gz의 확장과 자동으로 적절한 사용하여 압축 해제 CompressionCodec을 . 그러나 위의 확장자를 갖는 압축 파일은 분할 될 수 없으며 각 압축 파일은 단일 매퍼에 의해 전체적으로 처리 된다는 점에 유의해야합니다 .

InputSplit

InputSplit 은 개별 매퍼 가 처리 할 데이터를 나타냅니다 .

일반적 InputSplit는 입력의 바이트 중심의 뷰를 제공하고, 그것의 책임이다 RecordReader 처리 및 기록 중심 뷰를 제공한다.

FileSplit 이 기본 InputSplit 입니다. mapreduce.map.input.file 을 논리 분할에 대한 입력 파일의 경로로 설정합니다 .

RecordReader

RecordReader는 판독 <키 값> 에서 쌍 InputSplit를 .

일반적 RecordReader는 에 의해 제공되는 입력의 바이트 기반 뷰 변환 InputSplit을 하고, 상기 레코드가 지향 제공 매퍼 처리 구현. 따라서 RecordReader 는 레코드 경계 처리의 책임을 맡고 작업에 키와 값을 제공합니다.

작업 출력

OutputFormat 은 MapReduce 작업의 출력 사양을 설명합니다.

MapReduce 프레임 워크 는 작업 의 OutputFormat 을 사용하여 다음을 수행합니다.

  1. 작업의 출력 사양을 검증합니다. 예를 들어, 출력 디렉토리가 존재하지 않는지 확인하십시오.
  2. 작업의 출력 파일을 쓰는 데 사용되는 RecordWriter 구현을 제공하십시오 출력 파일은 FileSystem에 저장됩니다 .

TextOutputFormat 은 기본 OutputFormat 입니다.

OutputCommitter

OutputCommitter 는 MapReduce 작업에 대한 작업 출력 커밋을 설명합니다.

MapReduce 프레임 워크 는 작업 의 OutputCommitter 를 사용하여 다음 작업을 수행합니다.

  1. 초기화 중에 작업을 설정하십시오. 예를 들어, 작업 초기화 중 작업의 임시 출력 디렉토리를 작성하십시오. 작업 설정은 작업이 PREP 상태이고 작업 초기화 후 별도의 작업으로 수행됩니다. 설정 작업이 완료되면 작업이 실행 중 상태로 이동합니다.
  2. 작업 완료 후 작업을 정리하십시오. 예를 들어, 작업 완료 후 임시 출력 디렉토리를 제거하십시오. 작업 정리는 작업이 끝나면 별도의 작업으로 수행됩니다. 정리 작업이 완료되면 작업이 SUCCEDED / FAILED / KILLED로 선언됩니다.
  3. 작업 임시 출력을 설정하십시오. 작업 설정은 작업 초기화 중 동일한 작업의 일부로 수행됩니다.
  4. 작업에 커밋이 필요한지 확인하십시오. 이것은 태스크가 커밋을 필요로하지 않는 경우 커밋 프로 시저를 피하기위한 것입니다.
  5. 작업 출력 커밋. 일단 작업이 완료되면, 작업은 필요할 경우 출력을 커밋합니다.
  6. 커밋 된 태스크를 버립니다. 작업이 실패 / 종료되면 출력이 정리됩니다. 작업을 정리할 수 없으면 (예외 블록에서) 정리를 수행하기 위해 동일한 시도 ID로 별도의 작업이 실행됩니다.

FileOutputCommitter 는 기본 OutputCommitter 입니다. 작업 설정 / 정리 작업은 NodeManager에서 사용할 수있는 컨테이너를 맵핑하거나 축소합니다. JobCleanup 작업, TaskCleanup 작업 및 JobSetup 작업이 가장 높은 우선 순위를 갖습니다.

작업 부작용 파일

일부 응용 프로그램에서는 구성 요소 작업이 실제 작업 출력 파일과 다른 측면 파일을 작성 및 / 또는 작성해야합니다.

그러한 경우 FileSystem 에서 동일한 파일 (경로)을 열거 나 쓰려고 할 때 동시에 실행 되는 동일한 매퍼 또는 감속기 의 두 인스턴스 (예 : 추측 작업)에 문제가있을 수 있습니다 따라서 응용 프로그램 작성자는 작업마다 고유 한 이름을 선택해야합니다 (시도 번호 (예 : 시도 _200709221812_0001_m_000000_0 )).

이러한 문제를 피하기 위해 MapReduce 프레임 워크는 OutputCommitter 가 FileOutputCommitter 일 때 $ {mapreduce.task.output.dir}을 통해 액세스 할 수 있는 특별한 $ {mapreduce.output.fileoutputformat.outputdir} / _ temporary / _ $ {taskid } 각 작업에 대해 FileSystem 에서 시도한 작업 시도 의 출력이 저장됩니다. 작업 시도가 성공적으로 완료되면 $ {mapreduce.output.fileoutputformat.outputdir} / _ temporary / _ $ {taskid} (전용) 파일이 $ {mapreduce.output.fileoutputformat.outputdir}으로 승격 됩니다.물론 프레임 워크는 실패한 작업 시도의 하위 디렉토리를 삭제합니다. 이 프로세스는 애플리케이션에 완전히 투명합니다.

응용 프로그램 작성자는 FileOutputFormat.getWorkOutputPath (Conext) 를 통해 작업을 실행하는 동안 $ {mapreduce.task.output.dir}에 필요한 임의의 부가 파일을 생성하여이 기능을 활용할 수 있으며 프레임 워크는이를 성공적으로 승격시킵니다 작업 시도마다 고유 한 경로를 선택할 필요가 없습니다.

참고 : 특정 작업 시도를 실행하는 동안 $ {mapreduce.task.output.dir} 의 값 은 실제로 $ {mapreduce.output.fileoutputformat.outputdir} / _ temporary / _ {$ taskid}이며이 값은 MapReduce 프레임 워크. 따라서 MapReduce 태스크의 FileOutputFormat.getWorkOutputPath (Conext) 가 리턴 한 경로에 사이드 파일을 작성 하여이 기능을 활용하십시오.

감속기 = NONE (즉, 0 감소) 인 작업의 맵에 대해서는 전체 출력이 해당 맵의 출력이 HDFS로 직접 연결되기 때문에 사실입니다.

RecordWriter

RecordWriter 는 출력 <key, value> 쌍을 출력 파일에 씁니다 .

RecordWriter 구현은 작업 출력을 FileSystem에 기록 합니다.

기타 유용한 기능

대기열에 작업 제출

사용자가 대기열에 작업을 제출합니다. 큐는 작업 모음으로서 시스템이 특정 기능을 제공 할 수있게합니다. 예를 들어 대기열은 ACL을 사용하여 작업을 제출할 수있는 사용자를 제어합니다. 대기열은 Hadoop Scheduler에서 주로 사용됩니다.

Hadoop은 'default'라는 하나의 필수 대기열로 구성됩니다. 대기열 이름은 Hadoop 사이트 구성 의 mapreduce.job.queuename > 특성에 정의됩니다 용량 스케줄러 와 같은 일부 작업 스케줄러 는 여러 대기열을 지원합니다.

작업은 그것이를 통해 제출해야하는 큐 정의 mapreduce.job.queuename의 , 또는 Configuration.set (를 통해 속성을 MRJobConfig.QUEUE_NAME , 문자열) API. 대기열 이름 설정은 선택 사항입니다. 연관된 큐 이름없이 작업이 제출되면 '기본'큐로 제출됩니다.

카운터

카운터 는 MapReduce 프레임 워크 또는 응용 프로그램에서 정의한 전역 카운터를 나타냅니다. 각 카운터 는 모든 열거 형일 수 있습니다 특정 카운터 열거는 유형의 그룹으로 오므된다 Counters.Group .

응용 프로그램은 임의의 Counters (유형 Enum )를 정의 하고 지도 및 / 또는 메소드 에서 Counters.incrCounter (Enum, long) 또는 Counters.incrCounter (String, String, long)를 통해 카운터를 업데이트 할  있습니다. 그런 다음이 카운터는 프레임 워크에 의해 전체적으로 집계됩니다.

DistributedCache

DistributedCache 는 응용 프로그램 별 대규모의 읽기 전용 파일을 효율적으로 배포합니다.

DistributedCache 는 애플리케이션에 필요한 파일 (텍스트, 아카이브, 병 등)을 캐시하기 위해 MapReduce 프레임 워크에서 제공하는 기능입니다.

응용 프로그램은 작업의 URL (hdfs : //)을 통해 캐시 될 파일을 지정합니다 DistributedCache는 // URL을 이미에 존재 : HDFS를 통해 지정한 파일이 있다고 가정합니다 파일 시스템 .

프레임 워크는 작업에 대한 작업이 해당 노드에서 실행되기 전에 필요한 파일을 종속 노드에 복사합니다. 효율성은 파일이 작업 당 한 번만 복사되고 슬레이브에서 아카이브되지 않은 아카이브를 캐시 할 수 있다는 사실에서 유래합니다.

DistributedCache 는 캐시 된 파일의 수정 타임 스탬프를 추적합니다. 분명히 캐시 파일은 응용 프로그램에 의해 수정되거나 작업이 실행되는 동안 외부에서 수정되어서는 안됩니다.

DistributedCache 는 단순한 읽기 전용 데이터 / 텍스트 파일과 아카이브 및 병과 같은보다 복잡한 유형을 배포하는 데 사용할 수 있습니다. 아카이브 (zip, tar, tgz 및 tar.gz 파일)는 슬레이브 노드에서 아카이브되지 않습니다. 파일에 실행 권한이 설정되어 있습니다.

파일 / 아카이브는 mapreduce.job.cache. {files | archives} 속성을 설정하여 배포 할 수 있습니다 둘 이상의 파일 / 아카이브를 배포해야하는 경우 쉼표로 구분 된 경로로 추가 할 수 있습니다. 속성은 또한 API Job.addCacheFile (URI) / Job.addCacheArchive (URI) 및 Job.setCacheFiles (URI [])Job.setCacheArchives (URI []) 에서 설정할 수 있습니다. 여기서 URI는 hdfs : // host 형식입니다 : 포트 / 절대 경로 # 링크 이름 . 스트리밍에서 파일은 명령 줄 옵션 -cacheFile / -cacheArchive를 통해 배포 할 수 있습니다 .

DistributedCache는 또한 맵에서 사용하기위한 기본적인 소프트웨어 배포 메커니즘으로 사용 및 / 또는 작업을 줄일 수 있습니다. jar와 네이티브 라이브러리를 모두 배포하는 데 사용할 수 있습니다. Job.addArchiveToClassPath (경로) 또는 Job.addFileToClassPath (경로) API는 파일 / 항아리를 캐시 또한에 추가하는 데 사용할 수있는 클래스 패스 아이 JVM의. 구성 등록 정보 mapreduce.job.classpath. {파일 | 아카이브} 를 설정하여 동일하게 수행 할 수 있습니다 마찬가지로 작업의 작업 디렉토리에 심볼릭 링크 된 캐시 된 파일을 사용하여 기본 라이브러리를 배포하고로드 할 수 있습니다.

개인용 및 공용 DistributedCache 파일

DistributedCache 파일은 개인 또는 공용이 될 수 있으며 종속 노드에서 공유되는 방법을 결정합니다.

  • "개인"DistributedCache 파일은 해당 파일이 필요한 작업의 사용자에게 개인용 로컬 디렉토리에 캐시됩니다. 이 파일들은 특정 사용자의 모든 작업과 작업에 의해서만 공유되며 슬레이브에있는 다른 사용자의 작업으로는 액세스 할 수 없습니다. 파일이 업로드되는 파일 시스템 (일반적으로 HDFS)에 대한 사용 권한으로 인해 DistributedCache 파일은 비공개가됩니다. 파일에 읽을 수있는 액세스 권한이 없거나 파일로 이어지는 디렉토리 경로가 조회를 위해 실행 가능한 전역 액세스 권한이없는 경우 파일은 개인용이됩니다.
  • "공용"DistributedCache 파일은 전역 디렉토리에 캐시되며 파일 액세스는 모든 사용자가 공개적으로 볼 수 있도록 설정됩니다. 이러한 파일은 슬레이브에있는 모든 사용자의 작업 및 작업에 의해 공유 될 수 있습니다. 파일이 업로드되는 파일 시스템 (일반적으로 HDFS)에 대한 권한으로 인해 DistributedCache 파일은 공개됩니다. 파일이 세계에서 읽을 수있는 액세스 권한을 가지고 있고 파일로 이어지는 디렉토리 경로가 조회를 위해 실행 가능한 전역 액세스 권한을 갖는 경우 파일이 공개됩니다. 다시 말해, 사용자가 모든 사용자가 파일을 공개적으로 사용할 수있게하려는 경우 파일 사용 권한을 세계에서 읽을 수 있도록 설정해야하며 파일로 이어지는 경로의 디렉터리 사용 권한은 모두 실행 가능해야합니다.
프로파일 링

프로파일 링은 맵 샘플에 대해 내장 된 Java 프로파일 러의 대표적인 (2 또는 3) 샘플을 얻는 유틸리티입니다.

사용자는 구성 등록 정보 mapreduce.task.profile 을 설정하여 시스템이 작업의 일부 작업에 대한 프로파일 러 정보를 수집해야하는지 여부를 지정할 수 있습니다 이 값은 API Configuration.set ( MRJobConfig.TASK_PROFILE , boolean)을 사용하여 설정할 수 있습니다 값이 true 로 설정 되면, 태스크 프로파일 링이 사용 가능하게됩니다. 프로파일 러 정보는 사용자 로그 디렉토리에 저장됩니다. 기본적으로 프로파일 링은 작업에 사용되지 않습니다.

사용자가 프로파일 링 을 설정하면 MapReduce 태스크 범위 를 프로파일 링하기 위해 mapreduce.task.profile. {map | reduce} 구성 특성을 사용할 수 있습니다 이 값은 API Configuration.set ( MRJobConfig.NUM_ {MAP | REDUCE} _PROFILES , String)을 사용하여 설정할 수 있습니다 기본적으로 지정된 범위는 0-2 입니다.

사용자는 구성 등록 정보 mapreduce.task.profile.params를 설정하여 프로파일 러 구성 인수를 지정할 수도 있습니다 이 값은 API Configuration.set ( MRJobConfig.TASK_PROFILE_PARAMS , String)을 사용하여 지정할 수 있습니다 문자열에 % s 이 있으면 태스크가 실행될 때 프로파일 링 출력 파일의 이름으로 바뀝니다. 이 매개 변수는 명령 행에서 태스크 하위 JVM으로 전달됩니다. 프로파일 링 매개 변수의 기본값은 -agentlib : hprof = cpu = samples, heap = sites, force = n, thread = y, verbose = n, file = % s 입니다.

디버깅

MapReduce 프레임 워크는 디버깅을 위해 사용자 제공 스크립트를 실행할 수있는 기능을 제공합니다. MapReduce 작업이 실패하면 사용자는 예를 들어 작업 로그를 처리하기 위해 디버그 스크립트를 실행할 수 있습니다. 스크립트에는 작업의 stdout 및 stderr 출력 인 syslog 및 jobconf에 대한 액세스 권한이 부여됩니다. 디버그 스크립트의 stdout 및 stderr의 출력은 콘솔 진단 및 작업 UI의 일부로 표시됩니다.

다음 절에서는 작업과 함께 디버그 스크립트를 제출하는 방법에 대해 설명합니다. 스크립트 파일을 배포하여 프레임 워크에 제출해야합니다.

스크립트 파일을 배포하는 방법 :

사용자 는 스크립트 파일 을 배포 하고 심볼릭 링크 하기 위해 DistributedCache 를 사용해야 합니다.

스크립트 제출 방법 :

디버그 스크립트를 제출하는 빠른 방법은 map 및 reduce 작업을 각각 디버깅하기 위해 mapreduce.map.debug.script 및 mapreduce.reduce.debug.script 속성 값을 설정 하는 것입니다. 이러한 속성은 API를 사용하여 설정할 수 있습니다 Configuration.set ( MRJobConfig.MAP_DEBUG_SCRIPT , 문자열) 및 Configuration.set ( MRJobConfig.REDUCE_DEBUG_SCRIPT , 문자열) . 스트리밍 모드에서 map 및 reduce 작업을 각각 디버깅하기 위해 -mapdebug 및 -reducedebug 명령 줄 옵션을 사용하여 디버그 스크립트를 제출할 수 있습니다 .

스크립트에 대한 인수는 작업의 stdout, stderr, syslog 및 jobconf 파일입니다. MapReduce 작업이 실패한 노드에서 실행되는 debug 명령은 다음과 같습니다. 
$ script $ stdout $ stderr $ syslog $ jobconf

파이프 프로그램은 명령의 다섯 번째 인수로 c ++ 프로그램 이름을 갖습니다. 따라서 파이프 프로그램의 경우 명령은 
$ script $ stdout $ stderr $ syslog $ jobconf $ program입니다.

기본 동작 :

파이프의 경우 기본 스크립트는 gdb에서 코어 덤프를 처리하기 위해 실행되고 스택 추적을 인쇄하고 실행중인 스레드에 대한 정보를 제공합니다.

데이터 압축

Hadoop MapReduce는 애플리케이션 작성자가 중간 맵 출력과 작업 출력, 즉 축소 출력에 대해 압축을 지정하는 기능을 제공합니다. 또한 zlib 압축 알고리즘에 대한 CompressionCodec 구현 과 함께 번들로 제공됩니다 gzip을 , 레스 햇의 bzip2 , 물어 및 lz4의 파일 형식도 지원됩니다.

Hadoop은 또한 성능 (zlib)과 Java 라이브러리의 비 가용성을 이유로 위의 압축 코덱의 기본 구현을 제공합니다. 사용 및 가용성에 대한 자세한 내용은 여기를 참조하십시오 .

중간 출력

응용 프로그램은 Configuration.set ( MRJobConfig.MAP_OUTPUT_COMPRESS , 부울) API 및 Configuration.set ( MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC , 클래스) API 를 통해 사용되는 CompressionCodec 을 통해 중간 맵 출력의 압축을 제어 할 수 있습니다 .

작업 산출물

응용 프로그램은 FileOutputFormat.setCompressOutput (Job, boolean) API 를 통해 작업 출력 압축을 제어 할 수 있으며 사용되는 CompressionCodec 은 FileOutputFormat.setOutputCompressorClass (Job, Class) API를 통해 지정할 수 있습니다.

작업 출력을 SequenceFileOutputFormat에 저장 하려면 SequenceFileOutputFormat.setOutputCompressionType (Job, SequenceFile.CompressionType) API를 통해 필수 SequenceFile.CompressionType (즉 RECORD / BLOCK - RECORD로 설정 됨 )을 지정할 수 있습니다.

잘못된 레코드 건너 뜀

Hadoop은 맵 입력을 처리 할 때 일련의 잘못된 입력 레코드를 건너 뛸 수있는 옵션을 제공합니다. 응용 프로그램은 SkipBadRecords 클래스를 통해이 기능을 제어 할 수 있습니다 .

이 기능은지도 작업이 특정 입력에 결정적으로 충돌하는 경우에 사용할 수 있습니다. 이것은 대개 맵 기능의 버그로 인해 발생합니다. 일반적으로 사용자는 이러한 버그를 수정해야합니다. 그러나 이것은 때때로 가능하지 않습니다. 예를 들어 소스 코드를 사용할 수없는 타사 라이브러리에 버그가있을 수 있습니다. 이 경우 여러 번 시도한 후에도 작업이 성공적으로 완료되지 않으며 작업이 실패합니다. 이 기능을 사용하면 나쁜 레코드를 둘러싼 데이터의 일부만 손실되어 일부 응용 프로그램 (예 : 매우 큰 데이터에 대한 통계 분석을 수행하는 응용 프로그램)에서 수용 할 수 있습니다.

기본적으로이 기능은 비활성화되어 있습니다. 이를 활성화하려면 SkipBadRecords.setMapperMaxSkipRecords (Configuration, long) 및 SkipBadRecords.setReducerMaxSkipGroups (Configuration, long)을 참조하십시오 .

이 기능을 사용하면 특정 수의 맵 오류가 발생하면 프레임 워크가 '건너 뛰기 모드'가됩니다. 자세한 내용은 SkipBadRecords.setAttemptsToStartSkipping (Configuration, int)을 참조하십시오 '건너 뛰기 모드'에서지도 작업은 처리되는 레코드의 범위를 유지합니다. 이렇게하기 위해 프레임 워크는 처리 된 레코드 카운터에 의존합니다. SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS 및 SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS를 참조하십시오 이 카운터를 사용하면 프레임 워크에서 성공적으로 처리 된 레코드 수를 알 수 있으므로 어떤 레코드 범위로 인해 작업이 중단되는지 알 수 있습니다. 추가 시도에서이 범위의 레코드는 건너 뜁니다.

건너 뛴 레코드 수는 처리 된 레코드 카운터가 응용 프로그램에 의해 증가되는 빈도에 따라 다릅니다. 모든 레코드가 처리 된 후에이 카운터를 증가시키는 것이 좋습니다. 일반적으로 처리를 일괄 적으로 수행하는 일부 응용 프로그램에서는 불가능할 수 있습니다. 이러한 경우 프레임 워크는 나쁜 레코드를 둘러싼 추가 레코드를 건너 뛸 수 있습니다. 사용자는 SkipBadRecords.setMapperMaxSkipRecords (Configuration, long) 및 SkipBadRecords.setReducerMaxSkipGroups (Configuration, long)를 통해 건너 뛴 레코드 수를 제어 할 수 있습니다.프레임 워크는 2 진 검색과 유사한 접근 방식을 사용하여 건너 뛴 레코드의 범위를 좁히려고합니다. 건너 뛴 범위는 두 개의 절반으로 나누어지고 절반 만 실행됩니다. 후속 실패에서 프레임 워크는 어느 부분에 나쁜 레코드가 있는지 파악합니다. 허용 된 건너 뛴 값이 충족되거나 모든 작업 시도가 모두 소모 될 때까지 작업이 다시 실행됩니다. 작업 시도 횟수를 늘리려면 Job.setMaxMapAttempts (int) 및 Job.setMaxReduceAttempts (int)를 사용하십시오.

건너 뛴 레코드는 나중에 분석 할 수 있도록 시퀀스 파일 형식으로 HDFS에 기록됩니다. 위치는 SkipBadRecords.setSkipOutputPath (JobConf, Path)를 통해 변경할 수 있습니다 .

예 : WordCount v2.0

여기에 우리가 지금까지 논의한 MapReduce 프레임 워크가 제공하는 많은 기능을 사용 하는보다 완벽한 WordCount 가 있습니다.

이를 위해서는 특히 DistributedCache 관련 기능을 위해 HDFS를 설치하여 실행해야합니다 따라서 가상 배포 또는 완전 배포 된 Hadoop 설치 에서만 작동 합니다.

소스 코드
import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.IOException; 
import java.net.URI; 
import java.util.ArrayList; 
import java.util.HashSet; 
import java.util.List; 
import java.util.Set; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.Counter; 
org.apache.hadoop.util.GenericOptionsParser 가져 오기; 
import org.apache.hadoop.util.StringUtils; 

공용 클래스 WordCount2 { 

  public static class TokenizerMapper는 
       Mapper <Object, Text, Text, IntWritable>를 확장합니다. { 

    static enum CountersEnum {INPUT_WORDS} 

    개인 최종 정적 IntWritable 하나 = 새 IntWritable (1); 
    개인 텍스트 단어 = 새 텍스트 (); 

    private 부울 caseSensitive; 
    개인 집합 <String> patternsToSkip = 새 HashSet <String> (); 

    개인 구성 conf; 
    private BufferedReader fis; 

    @보수
    공용 무효 설치 (컨텍스트 컨텍스트) IOException, 
        InterruptedException { 
      conf = context.getConfiguration () throw합니다 . 
      caseSensitive = conf.getBoolean ( "wordcount.case.sensitive", true); 
      if (conf.getBoolean ( "wordcount.skip.patterns", true)) { 
        URI [] patternsURIs = Job.getInstance (conf) .getCacheFiles (); 
        for (URI 패턴 URI : patternsURIs) { 
          Path patternsPath = 새 경로 (patternsURI.getPath ()); 
          String patternsFileName = patternsPath.getName (). toString (); 
          parseSkipFile (patternsFileName); 
        } 
      } 
    } 

    private void parseSkipFile (String fileName) { 
      try {
        fis = new BufferedReader (새 FileReader (fileName)); 
        문자열 패턴 = null; 
        while ((pattern = fis.readLine ())! = null) { 
          patternsToSkip.add (pattern); 
        } 
      } catch (IOException ioe) { 
        System.err.println ( "캐싱 된 파일을 구문 분석하는 동안 예외가 발생했습니다." 
            + StringUtils.stringifyException (ioe)); 
      } 
    } 

    @Override 
    public void map (Object key, Text value, Context context 
                    ) IOException, InterruptedException { 
      String line = (caseSensitive)? 
          value.toString () : value.toString (). toLowerCase ();
      for (String pattern : patternsToSkip) { 
        line = line.replaceAll (pattern, ""); 
      } 
      StringTokenizer itr = new StringTokenizer (line); 
      while (itr.hasMoreTokens ()) { 
        word.set (itr.nextToken ()); 
        context.write (word, one); 
        카운터 카운터 = context.getCounter (CountersEnum.class.getName (), 
            CountersEnum.INPUT_WORDS.toString ()); 
        counter.increment (1); 
      } 
    } 
  } 

  public static class IntSumReducer는 
       Reducer <Text, IntWritable, Text, IntWritable>을 확장합니다. { 
    private IntWritable result = new IntWritable ();

    public void reduce (텍스트 키, Iterable <IntWritable> 값, 
                       컨텍스트 컨텍스트 
                       ) throw IOException, InterruptedException { 
      int sum = 0; 
      for (IntWritable val : 값) { 
        sum + = val.get (); 
      } 
      result.set (합계); 
      context.write (key, result); 
    } 
  } 

  public static void main (String [] args) 예외를 throw합니다. { 
    Configuration conf = new Configuration (); 
    GenericOptionsParser optionParser = 새로운 GenericOptionsParser (conf, args); 
    String [] remainingArgs = optionParser.getRemainingArgs (); 
    if (! (remainingArgs.length! = 2 || remainingArgs.length! = 4)) {
      System.err.println ( "Usage : wordcount <in> <out> [-skip skipPatternFile]"); 
      System.exit (2); 
    } 
    Job job = Job.getInstance (conf, "word count"); 
    job.setJarByClass (WordCount2.class); 
    job.setMapperClass (TokenizerMapper.class); 
    job.setCombinerClass (IntSumReducer.class); 
    job.setReducerClass (IntSumReducer.class); 
    job.setOutputKeyClass (Text.class); 
    job.setOutputValueClass (IntWritable.class); 

    목록 <String> otherArgs = 새 ArrayList <String> (); 
        (새 경로 (remainingArgs [++ i]). toUri 
    (int i = 0; i <remainingArgs.length; ++ i) { 
      if ( "-skip".equals (remainingArgs [i])) { job.addCacheFile ());
        job.getConfiguration (). setBoolean ( "wordcount.skip.patterns", true); 
      } else { 
        otherArgs.add (remainingArgs [i]); 
      } 
    } 
    FileInputFormat.addInputPath (job, newPath (otherArgs.get (0))); 
    FileOutputFormat.setOutputPath (작업, 새 경로 (otherArgs.get (1))); 

    System.exit (job.waitForCompletion (true)? 0 : 1); 
  } 
}
샘플 실행

샘플 텍스트 파일을 입력으로 사용 :

$ bin / hdfs dfs -ls / user / joe / wordcount / input / 
/ user / joe / wordcount / input / file01 
/ user / joe / wordcount / input / file02 

$ bin / hdfs dfs -cat / user / joe / wordcount / input / file01 
Hello World, 안녕 세상! 

$ bin / hdfs dfs -cat / user / joe / wordcount / input / file02 
안녕 Hadoop, 안녕히 가세요.

응용 프로그램 실행 :

$ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output

Output:

$ bin/hdfs dfs -cat /user/joe/wordcount/output/part-r-00000 
Bye 1 
Goodbye 1 
Hadoop, 1 
Hello 2 
World! 1 
World, 1 
hadoop. 1 
to 1

Notice that the inputs differ from the first version we looked at, and how they affect the outputs.

Now, lets plug-in a pattern-file which lists the word-patterns to be ignored, via the DistributedCache.

$ bin/hdfs dfs -cat /user/joe/wordcount/patterns.txt 
\. 
\, 
\! 
to

Run it again, this time with more options:

$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

예상대로 출력 :

$ bin / hdfs dfs -cat / user / joe / wordcount / output-part-r-00000 
안녕히 계세요 안녕하세요 

하둡 2 
안녕하세요 2 
세계 2 
하프 1

한번 더 실행하면 이번에는 대소 문자를 구분하지 않습니다.

$ bin / hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive = false / user / joe / wordcount / input / user / joe / wordcount / output -skip /user/joe/wordcount/patterns.txt

물론 출력은 다음과 같습니다.

$ bin / hdfs dfs -cat / user / joe / wordcount / output / part-r-00000 
안녕히 가세요 1 
안녕히 계세요 1 안녕히 계세요 

안녕하세요 2 두 
세계 2

하이라이트

WordCount 의 두 번째 버전은 MapReduce 프레임 워크에서 제공하는 일부 기능을 사용하여 이전 버전 보다 향상되었습니다.

  • 응용 프로그램 이 Mapper (및 Reducer ) 구현 의 설치 메서드 에서 구성 매개 변수에 액세스하는 방법을 보여줍니다 .
  • 작업에 필요한 읽기 전용 데이터를 배포하는 데 DistributedCache 를 사용 하는 방법을 보여줍니다 여기에서는 사용자가 카운팅하는 동안 건너 뛰기위한 단어 패턴을 지정할 수 있습니다.
  • 일반 Hadoop 명령 줄 옵션을 처리 하는 GenericOptionsParser 의 유틸리티를 보여줍니다 .
  • 응용 프로그램에서 카운터 를 사용 하는 방법 과 지도 (및 축소 ) 메서드에 전달되는 응용 프로그램 별 상태 정보를 설정하는 방법을 보여줍니다 .

Java 및 JNI는 미국 및 기타 국가에서 사용되는 Oracle America, Inc.의 상표 또는 등록 상표입니다.



상당히 많은 인터넷글을 보면서 따라해보고 여러가지 다른방법시도 해보았지만 


apache 에서 제공하는 이 뉴얼이 가장 정확하였고 오류또한 없었다. 


nutch 구성 예정인 사용자라면


아파치에서 권장하는 nutch 와 hadoop 버전에 맞추어 설정하여야한다. 


nutch 2.3.1 ver = (Apache Hadoop 1.2.1 및 2.5.2) 



(*실제 nutch 구동떄문에 하둡의 무수히 많은 버전을 설치하는 삽질을하였다..)


Hadoop 2.5.2 / nutch 2.3.1 



https://archive.apache.org/dist/hadoop/core/hadoop-2.5.2/


Hadoop MapReduce Next Generation - Setting up a Single Node Cluster.

Purpose

This document describes how to set up and configure a single-node Hadoop installation so that you can quickly perform simple operations using Hadoop MapReduce and the Hadoop Distributed File System (HDFS).

Prerequisites

Supported Platforms

  • GNU/Linux is supported as a development and production platform. Hadoop has been demonstrated on GNU/Linux clusters with 2000 nodes.
  • Windows is also a supported platform but the followings steps are for Linux only. To set up Hadoop on Windows, see wiki page.

Required Software

Required software for Linux include:

  1. Java™ must be installed. Recommended Java versions are described at HadoopJavaVersions.
  2. ssh must be installed and sshd must be running to use the Hadoop scripts that manage remote Hadoop daemons.

Installing Software

If your cluster doesn't have the requisite software you will need to install it.

For example on Ubuntu Linux:

  $ sudo apt-get install ssh
  $ sudo apt-get install rsync

Download

To get a Hadoop distribution, download a recent stable release from one of the Apache Download Mirrors.

Prepare to Start the Hadoop Cluster

Unpack the downloaded Hadoop distribution. In the distribution, edit the file etc/hadoop/hadoop-env.sh to define some parameters as follows:

  # set to the root of your Java installation
  export JAVA_HOME=/usr/java/latest

  # Assuming your installation directory is /usr/local/hadoop
  export HADOOP_PREFIX=/usr/local/hadoop

Try the following command:

  $ bin/hadoop

This will display the usage documentation for the hadoop script.

Now you are ready to start your Hadoop cluster in one of the three supported modes:

Standalone Operation

By default, Hadoop is configured to run in a non-distributed mode, as a single Java process. This is useful for debugging.

The following example copies the unpacked conf directory to use as input and then finds and displays every match of the given regular expression. Output is written to the given output directory.

  $ mkdir input
  $ cp etc/hadoop/*.xml input
  $ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.2.jar grep input output 'dfs[a-z.]+'
  $ cat output/*

Pseudo-Distributed Operation

Hadoop can also be run on a single-node in a pseudo-distributed mode where each Hadoop daemon runs in a separate Java process.

Configuration

Use the following:

etc/hadoop/core-site.xml:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

etc/hadoop/hdfs-site.xml:

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

Setup passphraseless ssh

Now check that you can ssh to the localhost without a passphrase:

  $ ssh localhost

If you cannot ssh to localhost without a passphrase, execute the following commands:

  $ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
  $ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

Execution

The following instructions are to run a MapReduce job locally. If you want to execute a job on YARN, see YARN on Single Node.

  1. Format the filesystem:
      $ bin/hdfs namenode -format
  2. Start NameNode daemon and DataNode daemon:
      $ sbin/start-dfs.sh

    The hadoop daemon log output is written to the $HADOOP_LOG_DIR directory (defaults to $HADOOP_HOME/logs).

  3. Browse the web interface for the NameNode; by default it is available at:
    • NameNode - http://localhost:50070/
  4. Make the HDFS directories required to execute MapReduce jobs:
      $ bin/hdfs dfs -mkdir /user
      $ bin/hdfs dfs -mkdir /user/<username>
  5. Copy the input files into the distributed filesystem:
      $ bin/hdfs dfs -put etc/hadoop input
  6. Run some of the examples provided:
      $ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.2.jar grep input output 'dfs[a-z.]+'
  7. Examine the output files:

    Copy the output files from the distributed filesystem to the local filesystem and examine them:

      $ bin/hdfs dfs -get output output
      $ cat output/*

    or

    View the output files on the distributed filesystem:

      $ bin/hdfs dfs -cat output/*
  8. When you're done, stop the daemons with:
      $ sbin/stop-dfs.sh

YARN on Single Node

You can run a MapReduce job on YARN in a pseudo-distributed mode by setting a few parameters and running ResourceManager daemon and NodeManager daemon in addition.

The following instructions assume that 1. ~ 4. steps of the above instructions are already executed.

  1. Configure parameters as follows:

    etc/hadoop/mapred-site.xml:

    <configuration>
        <property>
            <name>mapreduce.framework.name</name>
            <value>yarn</value>
        </property>
    </configuration>

    etc/hadoop/yarn-site.xml:

    <configuration>
        <property>
            <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle</value>
        </property>
    </configuration>
  2. Start ResourceManager daemon and NodeManager daemon:
      $ sbin/start-yarn.sh
  3. Browse the web interface for the ResourceManager; by default it is available at:
    • ResourceManager - http://localhost:8088/
  4. Run a MapReduce job.
  5. When you're done, stop the daemons with:
      $ sbin/stop-yarn.sh

Fully-Distributed Operation

For information on setting up fully-distributed, non-trivial clusters see Cluster Setup.







번역본


Hadoop MapReduce 차세대 - 단일 노드 클러스터 설정.

목적

이 문서는 단일 노드 Hadoop 설치를 설정하고 구성하여 Hadoop MapReduce 및 Hadoop Distributed File System (HDFS)을 사용하여 간단한 작업을 신속하게 수행하는 방법을 설명합니다.

선결 요건

지원 플랫폼

  • GNU / Linux는 개발 및 생산 플랫폼으로 지원됩니다. Hadoop은 2000 노드를 가진 GNU / Linux 클러스터에서 시연되었습니다.
  • Windows도 지원되는 플랫폼이지만 다음 단계는 Linux에만 해당됩니다. Windows에서 Hadoop을 설정하려면 wiki 페이지를 참조 하십시오 .

필수 소프트웨어

Linux에 필요한 소프트웨어는 다음과 같습니다.

  1. Java ™가 설치되어 있어야합니다. 권장 Java 버전은 HadoopJavaVersions에 설명되어 있습니다 .
  2. ssh가 설치되어 있어야하며 원격 Hadoop 데몬을 관리하는 Hadoop 스크립트를 사용하려면 sshd가 실행 중이어야합니다.

소프트웨어 설치

클러스터에 필수 소프트웨어가 없다면 설치해야합니다.

예를 들어 Ubuntu Linux의 경우 :

  $ sudo apt-get install ssh 
  $ sudo apt-get install rsync

다운로드

Hadoop 배포본을 얻으려면 아파치 다운로드 미러 (Apache Download Mirrors) 중 하나에서 최신 안정 버전을 다운로드하십시오 .

Hadoop 클러스터 시작 준비

다운로드 한 Hadoop 배포의 압축을 풉니 다. 배포판에서, etc / hadoop / hadoop-env.sh 파일을 편집하여 다음과 같은 매개 변수를 정의하십시오.

  # Java 설치 
  내보내기 의 루트로 설정합니다. JAVA_HOME = / usr / java / latest 

  # 설치 디렉토리가 / usr / local / hadoop이라고 가정합니다. 
  export HADOOP_PREFIX = / usr / local / hadoop

다음 명령을 시도하십시오.

  $ bin / hadoop

그러면 hadoop 스크립트에 대한 사용 설명서가 표시됩니다.

이제 3 가지 지원 모드 중 하나에서 Hadoop 클러스터를 시작할 준비가되었습니다.

독립 실행 형 작업

기본적으로 Hadoop은 비 분산 모드에서 단일 Java 프로세스로 실행되도록 구성됩니다. 이것은 디버깅에 유용합니다.

다음 예제는 압축 해제 된 conf 디렉토리를 복사하여 입력으로 사용하고 주어진 정규 표현식의 모든 일치를 찾아서 표시합니다. 출력은 주어진 출력 디렉토리에 기록됩니다.

  $ mkdir 입력 
  $ cp etc / hadoop / *. xml 입력 
  $ bin / hadoop jar 공유 / hadoop / mapreduce / hadoop-mapreduce-examples-2.5.2.jar grep 입력 출력 'dfs [az.] +' 
  $ cat 출력 / *

의사 분산 작업

또한 Hadoop은 각 Hadoop 데몬이 별도의 Java 프로세스에서 실행되는 의사 배포 모드의 단일 노드에서 실행할 수 있습니다.

구성

다음을 사용하십시오.

etc / hadoop / core-site.xml :

<configuration> 
    <property> 
        <name> fs.defaultFS </ name> 
        <value> hdfs : // localhost : 9000 </ value> 
    </ property> 
</ configuration>

etc / hadoop / hdfs-site.xml :

<configuration> 
    <property> 
        <name> dfs.replication </ name> 
        <value> 1 </ value> 
    </ property> 
</ configuration>

설치 passphraseless ssh

이제 passphrase없이 localhost로 ssh 할 수 있는지 확인하십시오 :

  $ ssh localhost

passphrase없이 localhost로 ssh 할 수 없다면 다음 명령을 실행하십시오 :

  $ ssh-keygen -t dsa -P "-f ~ / .ssh / id_dsa 
  $ cat ~ / .ssh / id_dsa.pub >> ~ / .ssh / authorized_keys

실행

다음 지시 사항은 MapReduce 작업을 로컬에서 실행하는 것입니다. YARN에서 작업을 실행하려면 단일 노드의 YARN을 참조하십시오 .

  1. 파일 시스템을 포맷하십시오 :
      $ bin / hdfs namenode -format
  2. NameNode 데몬 및 DataNode 데몬 시작 :
      $ sbin / start-dfs.sh

    hadoop 데몬 로그 출력은 HADOOP_LOG_DIR 디렉토리에 기록됩니다 (기본값은 HADOOP_HOME / logs입니다 ).

  3. NameNode에 대한 웹 인터페이스를 탐색하십시오. 기본적으로 다음 위치에서 사용할 수 있습니다.
    • NameNode - http : // localhost : 50070 /
  4. MapReduce 작업을 실행하는 데 필요한 HDFS 디렉토리를 만듭니다.
      $ bin / hdfs dfs -mkdir / 사용자 
      $ bin / hdfs dfs -mkdir / user / <username>
  5. 입력 파일을 분산 파일 시스템에 복사하십시오.
      $ bin / hdfs dfs -put etc / hadoop 입력
  6. 제공되는 몇 가지 예제를 실행하십시오.
      $ bin / hadoop jar share / hadoop / mapreduce / hadoop-mapreduce-examples-2.5.2.jar grep 입력 출력 'dfs [az.] +'
  7. 출력 파일을 검사하십시오.

    출력 파일을 분산 파일 시스템에서 로컬 파일 시스템으로 복사하고 검사하십시오 :

      $ bin / hdfs dfs -get 출력 출력 
      $ cat 출력 / *

    또는

    분산 파일 시스템의 출력 파일보기 :

      $ bin / hdfs dfs -cat 출력 / *
  8. 완료되면 데몬을 중지하십시오.
      $ sbin / stop-dfs.sh

단일 노드의 YARN

몇 개의 매개 변수를 설정하고 ResourceManager 데몬과 NodeManager 데몬을 추가로 실행하여 의사 배포 모드에서 YARN의 MapReduce 작업을 실행할 수 있습니다.

다음 명령어 는 위의 명령어 의 1 ~ 4. 단계 가 이미 실행 되었다고 가정합니다 .

  1. 다음과 같이 매개 변수를 구성하십시오.

    etc / hadoop / mapred-site.xml :

    <configuration> 
        <property> 
            <name> mapreduce.framework.name </ name> 
            <value> 원사 </ value> 
        </ property> 
    </ configuration>

    etc / hadoop / yarn-site.xml :

    <configuration> 
        <property> 
            <name> yarn.nodemanager.aux-services </ name> 
            <value> mapreduce_shuffle </ value> 
        </ property> 
    </ configuration>
  2. ResourceManager 데몬 및 NodeManager 데몬 시작 :
      $ sbin / start-yarn.sh
  3. ResourceManager의 웹 인터페이스를 탐색하십시오. 기본적으로 다음 위치에서 사용할 수 있습니다.
    • ResourceManager - http : // localhost : 8088 /
  4. MapReduce 작업을 실행하십시오.
  5. 완료되면 데몬을 중지하십시오.
      $ sbin / stop-yarn.sh

완전히 분산 된 작업

완전 분산 형 클러스터 설정에 대한 정보는 클러스터 설정을 참조하십시오 .



https://hadoop.apache.org/docs/r2.5.2/hadoop-project-dist/hadoop-common/SingleCluster.html


+ Recent posts