zookeeper为了防止,系统宕机或重启导致的数据丢失,会对数据进行定时持久化。有两种持久化方式:
1.为每次事务操作记录到日志文件,这样就可以通过执行这些日志文件来恢复数据。
2.为了加快ZooKeeper恢复的速度,ZooKeeper还提供了对树结构和session信息进行数据快照持久化的操作。
日志文件
日志文件记录zookeeper服务器上的每一次事务操作。
日志文件格式:log.ZXID,ZXID非常重要,它表示该文件起始的事务id。
数据快照
数据快照用来记录zookeeper服务器上某一时刻的全量内存数据内容,并写入指定磁盘文件中。
数据快照文件格式:snapshot.ZXID,ZXID非常重要,ZooKeeper会根据ZXID来确定数据恢复的起始点。
镜像文件主要存储zookeeper的树结构和session信息。
类图
FileTxnSnapLog
是操作数据持久化的核心类,底层通过TxnLog和SnapShot来分别操作日志文件和数据快照。
存储数据快照
public void save(DataTree dataTree, ConcurrentHashMapsessionsWithTimeouts) throws IOException { long lastZxid = dataTree.lastProcessedZxid; LOG.info("Snapshotting: " + Long.toHexString(lastZxid)); File snapshot=new File( snapDir, Util.makeSnapshotName(lastZxid)); snapLog.serialize(dataTree, sessionsWithTimeouts, snapshot); }
日志文件操作
public boolean append(Request si) throws IOException { return txnLog.append(si.hdr, si.txn); } public void commit() throws IOException { txnLog.commit(); } public void rollLog() throws IOException { txnLog.rollLog(); }
数据恢复
public long restore(DataTree dt, Mapsessions, PlayBackListener listener) throws IOException { snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; while (true) { hdr = itr.getHeader(); ...if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error(highestZxid + "(higestZxid) > " + hdr.getZxid() + "(next log) for type " + hdr.getType()); } else { highestZxid = hdr.getZxid(); } try { processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage()); } if (!itr.next()) break; } return highestZxid; }
FileTxnLog
负责维护事务日志对外的接口,包括事务日志的写入和读取等。
写入事务日志
1.如果日志文件打开,使用该日志文件;如果没有,使用该事务的zxid做为后缀,创建新的日志文件。
2.如果当前日志文件剩余空间不足4kb,对日志文件扩容到64mb,使用0来填充。预分配的原因是提高io效率。
3.对事务的头和事务体序列号
4.生成checksum
5.写入文件流。
public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { if (hdr != null) { ... if (logStream==null) { ... logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader"); // Make sure that the magic number is written before padding. logStream.flush(); currentSize = fos.getChannel().position(); streamsToFlush.add(fos); } padFile(fos); byte[] buf = Util.marshallTxnEntry(hdr, txn); ... Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf); return true; } return false; }
持久化本质是将内存中对象数据以二进制的方式存储到磁盘上,这个过程,底层通过jute来序列号。
序列化和反序列化的本质就是数据流与对象数据之间的变换。jute的序列化理念是让需要序列化的对象自己定义序列化协议。所以使用jute进行序列化的对象需要实现Record接口,该接口需要对象实现序列化和反序列化方法。此外jute还对序列化的流进行了抽象,OutputArchive代表输入流,InputArchive代表输入流,各种类型流的读写通过实现这两个接口实现。通过实现Record接口,对象定义序列化和反序列化的协议;通过实现OutputArchive和InputArchive,实现数据存储和读取。
Record代码:
1 public interface Record {2 public void serialize(OutputArchive archive, String tag)3 throws IOException;4 public void deserialize(InputArchive archive, String tag)5 throws IOException;6 }
OutputArchive代码:
1 public interface OutputArchive { 2 public void writeByte(byte b, String tag) throws IOException; 3 public void writeBool(boolean b, String tag) throws IOException; 4 public void writeInt(int i, String tag) throws IOException; 5 public void writeLong(long l, String tag) throws IOException; 6 public void writeFloat(float f, String tag) throws IOException; 7 public void writeDouble(double d, String tag) throws IOException; 8 public void writeString(String s, String tag) throws IOException; 9 public void writeBuffer(byte buf[], String tag)10 throws IOException;11 public void writeRecord(Record r, String tag) throws IOException;12 public void startRecord(Record r, String tag) throws IOException;13 public void endRecord(Record r, String tag) throws IOException;14 public void startVector(List v, String tag) throws IOException;15 public void endVector(List v, String tag) throws IOException;16 public void startMap(TreeMap v, String tag) throws IOException;17 public void endMap(TreeMap v, String tag) throws IOException;18 19 }
InputArchive代码:
1 public interface InputArchive { 2 public byte readByte(String tag) throws IOException; 3 public boolean readBool(String tag) throws IOException; 4 public int readInt(String tag) throws IOException; 5 public long readLong(String tag) throws IOException; 6 public float readFloat(String tag) throws IOException; 7 public double readDouble(String tag) throws IOException; 8 public String readString(String tag) throws IOException; 9 public byte[] readBuffer(String tag) throws IOException;10 public void readRecord(Record r, String tag) throws IOException;11 public void startRecord(String tag) throws IOException;12 public void endRecord(String tag) throws IOException;13 public Index startVector(String tag) throws IOException;14 public void endVector(String tag) throws IOException;15 public Index startMap(String tag) throws IOException;16 public void endMap(String tag) throws IOException;17 }
例如对FileHeader实现序列化,分别在serialize和deserialize方法中定义序列化协议,然后调用相应方法就可以将该对象序列化和反序列化。
1 public class FileHeader implements Record { 2 private int magic; 3 private int version; 4 private long dbid; 5 public void serialize(OutputArchive a_, String tag) throws java.io.IOException { 6 a_.startRecord(this,tag); 7 a_.writeInt(magic,"magic"); 8 a_.writeInt(version,"version"); 9 a_.writeLong(dbid,"dbid");10 a_.endRecord(this,tag);11 }12 public void deserialize(InputArchive a_, String tag) throws java.io.IOException {13 a_.startRecord(tag);14 magic=a_.readInt("magic");15 version=a_.readInt("version");16 dbid=a_.readLong("dbid");17 a_.endRecord(tag);18 }19 }
具体对象会序列化为什么样的数据形式以及从什么样数据形式中反序列化,取决于OutputArchive和InputArchive的实现。
二进制数据流实现:
BinaryOutputArchive:
1 public class BinaryOutputArchive implements OutputArchive { 2 private ByteBuffer bb = ByteBuffer.allocate(1024); 3 private DataOutput out; 4 public static BinaryOutputArchive getArchive(OutputStream strm) { 5 return new BinaryOutputArchive(new DataOutputStream(strm)); 6 } 7 public BinaryOutputArchive(DataOutput out) { 8 this.out = out; 9 } 10 public void writeByte(byte b, String tag) throws IOException { 11 out.writeByte(b); 12 } 13 public void writeBool(boolean b, String tag) throws IOException { 14 out.writeBoolean(b); 15 } 16 public void writeInt(int i, String tag) throws IOException { 17 out.writeInt(i); 18 } 19 public void writeLong(long l, String tag) throws IOException { 20 out.writeLong(l); 21 } 22 public void writeFloat(float f, String tag) throws IOException { 23 out.writeFloat(f); 24 } 25 public void writeDouble(double d, String tag) throws IOException { 26 out.writeDouble(d); 27 } 28 29 /** 30 * create our own char encoder to utf8. This is faster 31 * then string.getbytes(UTF8). 32 * @param s the string to encode into utf8 33 * @return utf8 byte sequence. 34 */ 35 final private ByteBuffer stringToByteBuffer(CharSequence s) { 36 bb.clear(); 37 final int len = s.length(); 38 for (int i = 0; i < len; i++) { 39 if (bb.remaining() < 3) { 40 ByteBuffer n = ByteBuffer.allocate(bb.capacity() << 1); 41 bb.flip(); 42 n.put(bb); 43 bb = n; 44 } 45 char c = s.charAt(i); 46 if (c < 0x80) { 47 bb.put((byte) c); 48 } else if (c < 0x800) { 49 bb.put((byte) (0xc0 | (c >> 6))); 50 bb.put((byte) (0x80 | (c & 0x3f))); 51 } else { 52 bb.put((byte) (0xe0 | (c >> 12))); 53 bb.put((byte) (0x80 | ((c >> 6) & 0x3f))); 54 bb.put((byte) (0x80 | (c & 0x3f))); 55 } 56 } 57 bb.flip(); 58 return bb; 59 } 60 61 public void writeString(String s, String tag) throws IOException { 62 if (s == null) { 63 writeInt(-1, "len"); 64 return; 65 } 66 ByteBuffer bb = stringToByteBuffer(s); 67 writeInt(bb.remaining(), "len"); 68 out.write(bb.array(), bb.position(), bb.limit()); 69 } 70 71 public void writeBuffer(byte barr[], String tag) 72 throws IOException { 73 if (barr == null) { 74 out.writeInt(-1); 75 return; 76 } 77 out.writeInt(barr.length); 78 out.write(barr); 79 } 80 81 public void writeRecord(Record r, String tag) throws IOException { 82 r.serialize(this, tag); 83 } 84 public void startRecord(Record r, String tag) throws IOException {} 85 86 public void endRecord(Record r, String tag) throws IOException {} 87 88 public void startVector(List v, String tag) throws IOException { 89 if (v == null) { 90 writeInt(-1, tag); 91 return; 92 } 93 writeInt(v.size(), tag); 94 } 95 public void endVector(List v, String tag) throws IOException {} 96 97 public void startMap(TreeMap v, String tag) throws IOException { 98 writeInt(v.size(), tag); 99 }100 public void endMap(TreeMap v, String tag) throws IOException {}101 }
BinaryInputArchive:
1 public class BinaryInputArchive implements InputArchive { 2 3 private DataInput in; 4 5 static public BinaryInputArchive getArchive(InputStream strm) { 6 return new BinaryInputArchive(new DataInputStream(strm)); 7 } 8 9 static private class BinaryIndex implements Index { 10 private int nelems; 11 BinaryIndex(int nelems) { 12 this.nelems = nelems; 13 } 14 public boolean done() { 15 return (nelems <= 0); 16 } 17 public void incr() { 18 nelems--; 19 } 20 } 21 /** Creates a new instance of BinaryInputArchive */ 22 public BinaryInputArchive(DataInput in) { 23 this.in = in; 24 } 25 26 public byte readByte(String tag) throws IOException { 27 return in.readByte(); 28 } 29 30 public boolean readBool(String tag) throws IOException { 31 return in.readBoolean(); 32 } 33 34 public int readInt(String tag) throws IOException { 35 return in.readInt(); 36 } 37 38 public long readLong(String tag) throws IOException { 39 return in.readLong(); 40 } 41 42 public float readFloat(String tag) throws IOException { 43 return in.readFloat(); 44 } 45 46 public double readDouble(String tag) throws IOException { 47 return in.readDouble(); 48 } 49 50 public String readString(String tag) throws IOException { 51 int len = in.readInt(); 52 if (len == -1) return null; 53 byte b[] = new byte[len]; 54 in.readFully(b); 55 return new String(b, "UTF8"); 56 } 57 58 static public final int maxBuffer = determineMaxBuffer(); 59 private static int determineMaxBuffer() { 60 String maxBufferString = System.getProperty("jute.maxbuffer"); 61 try { 62 return Integer.parseInt(maxBufferString); 63 } catch(Exception e) { 64 return 0xfffff; 65 } 66 67 } 68 public byte[] readBuffer(String tag) throws IOException { 69 int len = readInt(tag); 70 if (len == -1) return null; 71 if (len < 0 || len > maxBuffer) { 72 throw new IOException("Unreasonable length = " + len); 73 } 74 byte[] arr = new byte[len]; 75 in.readFully(arr); 76 return arr; 77 } 78 79 public void readRecord(Record r, String tag) throws IOException { 80 r.deserialize(this, tag); 81 } 82 83 public void startRecord(String tag) throws IOException {} 84 85 public void endRecord(String tag) throws IOException {} 86 87 public Index startVector(String tag) throws IOException { 88 int len = readInt(tag); 89 if (len == -1) { 90 return null; 91 } 92 return new BinaryIndex(len); 93 } 94 95 public void endVector(String tag) throws IOException {} 96 97 public Index startMap(String tag) throws IOException { 98 return new BinaryIndex(readInt(tag)); 99 }100 101 public void endMap(String tag) throws IOException {}102 103 }
其他的实现还有,cvs文件(CsvInputArchive,CsvOutputArchive);xml文件(XmlInputArchive,XmlOutputArchive)。