需求需要将Hadoop的数据插入到MongoDB。
数据类型是将字符串转换成一个类似Map的对象,插入到数据库中。以替换原有的单线程接口。
import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.Mongo; public class MongoOutputFormat implements OutputFormat<Text, Text> { @Override public void checkOutputSpecs(FileSystem arg0, JobConf arg1) throws IOException { // TODO Auto-generated method stub System.out.println("OutputFormat CheckOutpuSpecs() function is not supported~!"); } //实现OutputFormat接口的时候,返回一个RecordWriter对象。 //这里可以实例化数据库连接JDBC对象,和RecordWriter共一个生命周期。 //数据库连接串的相关对象,通过JobConf传入。 @Override public RecordWriter<Text, Text> getRecordWriter(FileSystem arg0, JobConf conf, String arg2, Progressable arg3) throws IOException { // Configuration conf = jobconf.g.getConfiguration() ; String ip = conf.get("mongoIp"); String port = conf.get("mongoPort"); Mongo mongo = new Mongo(ip,Integer.parseInt(port)); String username = conf.get("muser"); String password = conf.get("mpwd"); String dbname = conf.get("mongoDb"); String collectionName = conf.get("mongoCollection"); try { return new MongoDBRecordWriter(mongo,dbname,collectionName,username,password); } catch (Exception ex) { throw new IOException(ex); } } /** * A RecordWriter that writes the reduce output to a SQL table or MongoDB Collection! */ public static class MongoDBRecordWriter implements RecordWriter<Text, Text> { private DBCollection coll; private Mongo mongo; public MongoDBRecordWriter() throws SQLException { } //使用这个构造函数 public MongoDBRecordWriter(DBCollection coll) { this.coll = coll; } public MongoDBRecordWriter(Mongo mongo, String dbname, String collectionName, String username, String password) { this.mongo = mongo; DB d = this.mongo.getDB(dbname); d.authenticate(username, password.toCharArray()); this.coll = d.getCollection(collectionName); } public DBCollection getCollection() { return coll; } // public PreparedStatement getStatement() { // return statement; // } @Override /** Close函数,用于关闭OutputFormat中用到的资源对象 */ public void close(Reporter arg0) throws IOException { try { this.mongo.close(); } catch (Exception e) { try { System.out.println("Close() is not supported here..."); } catch (Exception ex) { ex.printStackTrace(); } throw new IOException(e); } finally { try { System.out.println("Close() is not supported here..."); } catch (Exception ex) { ex.printStackTrace(); } } } //RecordWriter中输出的方法,必须实现的。 @Override public void write(Text key, Text value) throws IOException { try { String line = value.toString(); String[] rs = line.split("\001"); Map m = new HashMap(); m.put("created_by", rs[7]); m.put("created_date", rs[8]); m.put("updated_by", rs[9]); m.put("updated_date", rs[10]); DBObject dbObj = new BasicDBObject(); dbObj.putAll(m); coll.save(dbObj); } catch (Exception e) { // LoggingUtils.logAll(LOG, "Exception encountered", e);. System.err.print(e); e.printStackTrace(); } } } }
相关推荐
go-mysql-mongodb是一项将MySQL数据自动同步到MongoDB的服务。 它首先使用mysqldump来获取原始数据,然后与binlog增量同步数据。 安装 安装Go( )并设置您的 go get github.com/WangXiangUSTC/go-mysql-mongodb ...
python开发,基于flask-restful 的中小型项目,restful风格API接口开发实例,以mongodb作为数据库 python开发,基于flask-restful 的中小型项目,restful风格API接口开发实例,以mongodb作为数据库python开发,基于...
同步MongoDB数据到ElasticSearch,支持全量同步、增量同步、实时同步,支持全操作,支持中间数据处理
canal 的 mysql 与 redis/memcached/mongodb 的 nosql 数据实时同步方案
最近由于业务需要,APP端后台需要将MongoDB中的数据同步到Java端后台的MySQL中,然后又将MySQL中算好的数据,同步到MongoDB数据库。 这个过程看是很繁琐,实际上这就是一个互相写表的过程。 接下来就看看node.js将...
同步Mongodb数据库,用于同步数据库,局域网内
C#\MongoDB应用开发实战\MongoDB
Laravel开发-eloquent-mongodb-repository 雄辩的MongoDB存储库实现
MEAN架构编程开发。全栈开发之道:MongoDB Express AngularJS Node.js
前端 Vue+Node+MongoDB高级全栈开发
Laravel开发-passport-mongodb Laravel Passport为Laravel和MongoDB提供OAuth2服务器支持。
Laravel开发-laravel-mongodb-passport Laravel MongoDB护照
Laravel开发-mongodb MongoDB 数据库驱动 让Laravel的雄辩模型支持MongoDB
Laravel开发-laravel-mongodb-log 用于Laravel的MongoDB日志记录
本课程是一套关于MongoDB应用开发的实战性教程,名为《深入浅出MongoDB应用实战开发(基础、开发指南、系统管理、集群及系统架构)》,教程侧重于讲解MongoDB的常用特性及高级特性,从实际开发的角度出发对MongoDB...
后台jdbc java 增删改查 数据同步
Laravel开发-laravel-mongodb-permission 基于SPATIE包的MongoDB(Moloquent)对Laravel 5.3的权限处理
用idea做的springboot+mongodb,实现自定义库的查询,,,非默认test库操作,和添加demo
Laravel开发-laravel-mongodb-user-provider 使用mongodb(moloquent)处理Laravel 5.3的用户提供程序
本软件使用c#编写,是SQL转存MongoDB的工具,可独立运行,也可定时运行,利用sql数据库时间戳字段进行更新采集区分。 本软件综合了,windows服务控制(安装卸载等),windows服务启动程序(服务控制定时运行程序),...